mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 23:11:37 +08:00
Compare commits
1 Commits
main
...
feat/langf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d8c2c77be6 |
@@ -58,6 +58,16 @@ _EXTRA_ENV_KEYS = frozenset({
|
|||||||
"MATRIX_PASSWORD", "MATRIX_ENCRYPTION", "MATRIX_DEVICE_ID", "MATRIX_HOME_ROOM",
|
"MATRIX_PASSWORD", "MATRIX_ENCRYPTION", "MATRIX_DEVICE_ID", "MATRIX_HOME_ROOM",
|
||||||
"MATRIX_REQUIRE_MENTION", "MATRIX_FREE_RESPONSE_ROOMS", "MATRIX_AUTO_THREAD", "MATRIX_DM_AUTO_THREAD",
|
"MATRIX_REQUIRE_MENTION", "MATRIX_FREE_RESPONSE_ROOMS", "MATRIX_AUTO_THREAD", "MATRIX_DM_AUTO_THREAD",
|
||||||
"MATRIX_RECOVERY_KEY",
|
"MATRIX_RECOVERY_KEY",
|
||||||
|
# Langfuse observability plugin — optional tuning keys + standard SDK vars
|
||||||
|
"HERMES_LANGFUSE_ENABLED", # backward-compat env var (new: plugins.langfuse.enabled in config.yaml)
|
||||||
|
"HERMES_LANGFUSE_ENV",
|
||||||
|
"HERMES_LANGFUSE_RELEASE",
|
||||||
|
"HERMES_LANGFUSE_SAMPLE_RATE",
|
||||||
|
"HERMES_LANGFUSE_MAX_CHARS",
|
||||||
|
"HERMES_LANGFUSE_DEBUG",
|
||||||
|
"LANGFUSE_PUBLIC_KEY",
|
||||||
|
"LANGFUSE_SECRET_KEY",
|
||||||
|
"LANGFUSE_BASE_URL",
|
||||||
})
|
})
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
@@ -1692,6 +1702,30 @@ OPTIONAL_ENV_VARS = {
|
|||||||
"category": "tool",
|
"category": "tool",
|
||||||
},
|
},
|
||||||
|
|
||||||
|
# ── Langfuse observability ──
|
||||||
|
"HERMES_LANGFUSE_PUBLIC_KEY": {
|
||||||
|
"description": "Langfuse project public key (pk-lf-...)",
|
||||||
|
"prompt": "Langfuse public key",
|
||||||
|
"url": "https://cloud.langfuse.com",
|
||||||
|
"password": False,
|
||||||
|
"category": "tool",
|
||||||
|
},
|
||||||
|
"HERMES_LANGFUSE_SECRET_KEY": {
|
||||||
|
"description": "Langfuse project secret key (sk-lf-...)",
|
||||||
|
"prompt": "Langfuse secret key",
|
||||||
|
"url": "https://cloud.langfuse.com",
|
||||||
|
"password": True,
|
||||||
|
"category": "tool",
|
||||||
|
},
|
||||||
|
"HERMES_LANGFUSE_BASE_URL": {
|
||||||
|
"description": "Langfuse server URL (default: https://cloud.langfuse.com)",
|
||||||
|
"prompt": "Langfuse server URL (leave empty for cloud.langfuse.com)",
|
||||||
|
"url": None,
|
||||||
|
"password": False,
|
||||||
|
"category": "tool",
|
||||||
|
"advanced": True,
|
||||||
|
},
|
||||||
|
|
||||||
# ── Messaging platforms ──
|
# ── Messaging platforms ──
|
||||||
"TELEGRAM_BOT_TOKEN": {
|
"TELEGRAM_BOT_TOKEN": {
|
||||||
"description": "Telegram bot token from @BotFather",
|
"description": "Telegram bot token from @BotFather",
|
||||||
|
|||||||
@@ -9082,7 +9082,11 @@ Examples:
|
|||||||
)
|
)
|
||||||
plugins_remove.add_argument("name", help="Plugin directory name to remove")
|
plugins_remove.add_argument("name", help="Plugin directory name to remove")
|
||||||
|
|
||||||
plugins_subparsers.add_parser("list", aliases=["ls"], help="List installed plugins")
|
plugins_list = plugins_subparsers.add_parser("list", aliases=["ls"], help="List installed plugins")
|
||||||
|
plugins_list.add_argument(
|
||||||
|
"--available", action="store_true",
|
||||||
|
help="Also show official optional plugins that are not yet installed",
|
||||||
|
)
|
||||||
|
|
||||||
plugins_enable = plugins_subparsers.add_parser(
|
plugins_enable = plugins_subparsers.add_parser(
|
||||||
"enable", help="Enable a disabled plugin"
|
"enable", help="Enable a disabled plugin"
|
||||||
|
|||||||
@@ -1,7 +1,13 @@
|
|||||||
"""``hermes plugins`` CLI subcommand — install, update, remove, and list plugins.
|
"""``hermes plugins`` CLI subcommand — install, update, remove, and list plugins.
|
||||||
|
|
||||||
Plugins are installed from Git repositories into ``~/.hermes/plugins/``.
|
Plugins can be installed from:
|
||||||
Supports full URLs and ``owner/repo`` shorthand (resolves to GitHub).
|
- Official optional plugins shipped with the repo: ``official/<category>/<name>``
|
||||||
|
- Git repositories (full URL or ``owner/repo`` GitHub shorthand)
|
||||||
|
|
||||||
|
Official plugins live in ``optional-plugins/`` inside the Hermes repo and are
|
||||||
|
copied into ``~/.hermes/plugins/`` on install — no git clone needed, no network
|
||||||
|
required. They are NOT auto-discovered from ``optional-plugins/``; only installed
|
||||||
|
copies in ``~/.hermes/plugins/`` are loaded by Hermes.
|
||||||
|
|
||||||
After install, if the plugin ships an ``after-install.md`` file it is
|
After install, if the plugin ships an ``after-install.md`` file it is
|
||||||
rendered with Rich Markdown. Otherwise a default confirmation is shown.
|
rendered with Rich Markdown. Otherwise a default confirmation is shown.
|
||||||
@@ -95,10 +101,80 @@ def _resolve_git_url(identifier: str) -> str:
|
|||||||
|
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Invalid plugin identifier: '{identifier}'. "
|
f"Invalid plugin identifier: '{identifier}'. "
|
||||||
"Use a Git URL or owner/repo shorthand."
|
"Use 'official/<category>/<name>', a Git URL, or owner/repo shorthand."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _optional_plugins_dir() -> Path:
|
||||||
|
"""Return the optional-plugins/ directory shipped with the Hermes repo."""
|
||||||
|
return Path(__file__).resolve().parent.parent / "optional-plugins"
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_official_plugin(identifier: str) -> Optional[Path]:
|
||||||
|
"""If *identifier* is 'official/<category>/<name>', return its source path.
|
||||||
|
|
||||||
|
Returns ``None`` when the identifier is not in official format or the
|
||||||
|
plugin directory does not exist.
|
||||||
|
"""
|
||||||
|
# Accept 'official/category/name' or just 'category/name' when the
|
||||||
|
# category/name path exists under optional-plugins/.
|
||||||
|
parts = identifier.strip("/").split("/")
|
||||||
|
|
||||||
|
# Strip leading 'official' prefix if present
|
||||||
|
if parts and parts[0] == "official":
|
||||||
|
parts = parts[1:]
|
||||||
|
|
||||||
|
if len(parts) < 1:
|
||||||
|
return None
|
||||||
|
|
||||||
|
base = _optional_plugins_dir()
|
||||||
|
# Try category/name (2 parts) or bare name (1 part)
|
||||||
|
for nparts in (2, 1):
|
||||||
|
if len(parts) < nparts:
|
||||||
|
continue
|
||||||
|
candidate = base.joinpath(*parts[-nparts:])
|
||||||
|
try:
|
||||||
|
resolved = candidate.resolve()
|
||||||
|
base_resolved = base.resolve()
|
||||||
|
resolved.relative_to(base_resolved) # traversal guard
|
||||||
|
except (ValueError, OSError):
|
||||||
|
continue
|
||||||
|
if resolved.is_dir() and (
|
||||||
|
(resolved / "plugin.yaml").exists() or (resolved / "__init__.py").exists()
|
||||||
|
):
|
||||||
|
return resolved
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _list_official_plugins() -> list[tuple[str, str]]:
|
||||||
|
"""Return [(identifier, description), ...] for all official optional plugins."""
|
||||||
|
base = _optional_plugins_dir()
|
||||||
|
if not base.is_dir():
|
||||||
|
return []
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for category_dir in sorted(base.iterdir()):
|
||||||
|
if not category_dir.is_dir() or category_dir.name.startswith("."):
|
||||||
|
continue
|
||||||
|
for plugin_dir in sorted(category_dir.iterdir()):
|
||||||
|
if not plugin_dir.is_dir() or plugin_dir.name.startswith("."):
|
||||||
|
continue
|
||||||
|
manifest_file = plugin_dir / "plugin.yaml"
|
||||||
|
desc = ""
|
||||||
|
if manifest_file.exists():
|
||||||
|
try:
|
||||||
|
import yaml
|
||||||
|
data = yaml.safe_load(manifest_file.read_text()) or {}
|
||||||
|
desc = data.get("description", "")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
identifier = f"official/{category_dir.name}/{plugin_dir.name}"
|
||||||
|
results.append((identifier, desc))
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
def _repo_name_from_url(url: str) -> str:
|
def _repo_name_from_url(url: str) -> str:
|
||||||
"""Extract the repo name from a Git URL for the plugin directory name."""
|
"""Extract the repo name from a Git URL for the plugin directory name."""
|
||||||
# Strip trailing .git and slashes
|
# Strip trailing .git and slashes
|
||||||
@@ -296,7 +372,61 @@ def cmd_install(
|
|||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
|
|
||||||
console = Console()
|
console = Console()
|
||||||
|
plugins_dir = _plugins_dir()
|
||||||
|
|
||||||
|
# ── Official optional plugins (no network, copied from optional-plugins/) ──
|
||||||
|
official_src = _resolve_official_plugin(identifier)
|
||||||
|
if official_src is not None:
|
||||||
|
manifest = _read_manifest(official_src)
|
||||||
|
plugin_name = manifest.get("name") or official_src.name
|
||||||
|
target = _sanitize_plugin_name(plugin_name, plugins_dir)
|
||||||
|
|
||||||
|
if target.exists():
|
||||||
|
if not force:
|
||||||
|
console.print(
|
||||||
|
f"[red]Error:[/red] Plugin '{plugin_name}' already exists at {target}.\n"
|
||||||
|
f"Use [bold]--force[/bold] to reinstall, or "
|
||||||
|
f"[bold]hermes plugins update {plugin_name}[/bold] to update."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
console.print(f"[dim] Removing existing {plugin_name}...[/dim]")
|
||||||
|
shutil.rmtree(target)
|
||||||
|
|
||||||
|
console.print(f"[dim]Installing {plugin_name} from official optional plugins...[/dim]")
|
||||||
|
shutil.copytree(str(official_src), str(target))
|
||||||
|
|
||||||
|
_copy_example_files(target, console)
|
||||||
|
_prompt_plugin_env_vars(manifest, console)
|
||||||
|
_display_after_install(target, identifier)
|
||||||
|
|
||||||
|
installed_name = manifest.get("name") or target.name
|
||||||
|
should_enable = enable
|
||||||
|
if should_enable is None:
|
||||||
|
if sys.stdin.isatty() and sys.stdout.isatty():
|
||||||
|
try:
|
||||||
|
answer = input(" Enable now? [y/N] ").strip().lower()
|
||||||
|
should_enable = answer in ("y", "yes")
|
||||||
|
except (EOFError, KeyboardInterrupt):
|
||||||
|
should_enable = False
|
||||||
|
else:
|
||||||
|
should_enable = False
|
||||||
|
|
||||||
|
if should_enable:
|
||||||
|
enabled = _get_enabled_set()
|
||||||
|
disabled = _get_disabled_set()
|
||||||
|
enabled.add(installed_name)
|
||||||
|
disabled.discard(installed_name)
|
||||||
|
_save_enabled_set(enabled)
|
||||||
|
_save_disabled_set(disabled)
|
||||||
|
console.print(f" [green]✓[/green] Plugin [bold]{installed_name}[/bold] enabled.")
|
||||||
|
else:
|
||||||
|
console.print(
|
||||||
|
f" [dim]Plugin installed but not enabled. "
|
||||||
|
f"Run [bold]hermes plugins enable {installed_name}[/bold] to activate.[/dim]"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# ── Git URL / owner/repo install ──────────────────────────────────────────
|
||||||
try:
|
try:
|
||||||
git_url = _resolve_git_url(identifier)
|
git_url = _resolve_git_url(identifier)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
@@ -310,8 +440,6 @@ def cmd_install(
|
|||||||
"Consider using https:// or git@ for production installs."
|
"Consider using https:// or git@ for production installs."
|
||||||
)
|
)
|
||||||
|
|
||||||
plugins_dir = _plugins_dir()
|
|
||||||
|
|
||||||
# Clone into a temp directory first so we can read plugin.yaml for the name
|
# Clone into a temp directory first so we can read plugin.yaml for the name
|
||||||
with tempfile.TemporaryDirectory() as tmp:
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
tmp_target = Path(tmp) / "plugin"
|
tmp_target = Path(tmp) / "plugin"
|
||||||
@@ -696,16 +824,21 @@ def _discover_all_plugins() -> list:
|
|||||||
return list(seen.values())
|
return list(seen.values())
|
||||||
|
|
||||||
|
|
||||||
def cmd_list() -> None:
|
def cmd_list(available: bool = False) -> None:
|
||||||
"""List all plugins (bundled + user) with enabled/disabled state."""
|
"""List all plugins (bundled + user) with enabled/disabled state.
|
||||||
|
|
||||||
|
When *available* is True, also show official optional plugins that are
|
||||||
|
not yet installed.
|
||||||
|
"""
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.table import Table
|
from rich.table import Table
|
||||||
|
|
||||||
console = Console()
|
console = Console()
|
||||||
entries = _discover_all_plugins()
|
entries = _discover_all_plugins()
|
||||||
if not entries:
|
if not entries and not available:
|
||||||
console.print("[dim]No plugins installed.[/dim]")
|
console.print("[dim]No plugins installed.[/dim]")
|
||||||
console.print("[dim]Install with:[/dim] hermes plugins install owner/repo")
|
console.print("[dim]Install with:[/dim] hermes plugins install official/<category>/<name>")
|
||||||
|
console.print("[dim]Browse available:[/dim] hermes plugins list --available")
|
||||||
return
|
return
|
||||||
|
|
||||||
enabled = _get_enabled_set()
|
enabled = _get_enabled_set()
|
||||||
@@ -734,6 +867,31 @@ def cmd_list() -> None:
|
|||||||
console.print("[dim]Enable/disable:[/dim] hermes plugins enable/disable <name>")
|
console.print("[dim]Enable/disable:[/dim] hermes plugins enable/disable <name>")
|
||||||
console.print("[dim]Plugins are opt-in by default — only 'enabled' plugins load.[/dim]")
|
console.print("[dim]Plugins are opt-in by default — only 'enabled' plugins load.[/dim]")
|
||||||
|
|
||||||
|
if available:
|
||||||
|
official = _list_official_plugins()
|
||||||
|
if official:
|
||||||
|
installed_names = {name for name, *_ in entries}
|
||||||
|
def _is_installed(ident: str) -> bool:
|
||||||
|
dirname = ident.rsplit("/", 1)[-1]
|
||||||
|
# Check both the directory name (langfuse-tracing) and
|
||||||
|
# common underscore variant (langfuse_tracing) since the
|
||||||
|
# installed plugin uses the manifest name, not the dir name.
|
||||||
|
return (dirname in installed_names
|
||||||
|
or dirname.replace("-", "_") in installed_names)
|
||||||
|
not_installed = [(ident, desc) for ident, desc in official
|
||||||
|
if not _is_installed(ident)]
|
||||||
|
if not_installed:
|
||||||
|
console.print()
|
||||||
|
avail_table = Table(title="Official optional plugins (not installed)", show_lines=False)
|
||||||
|
avail_table.add_column("Identifier", style="bold")
|
||||||
|
avail_table.add_column("Description")
|
||||||
|
for ident, desc in not_installed:
|
||||||
|
avail_table.add_row(ident, desc)
|
||||||
|
console.print(avail_table)
|
||||||
|
console.print("[dim]Install:[/dim] hermes plugins install official/<category>/<name>")
|
||||||
|
else:
|
||||||
|
console.print("[dim]All official optional plugins are already installed.[/dim]")
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Provider plugin discovery helpers
|
# Provider plugin discovery helpers
|
||||||
@@ -1270,7 +1428,7 @@ def plugins_command(args) -> None:
|
|||||||
elif action == "disable":
|
elif action == "disable":
|
||||||
cmd_disable(args.name)
|
cmd_disable(args.name)
|
||||||
elif action in ("list", "ls"):
|
elif action in ("list", "ls"):
|
||||||
cmd_list()
|
cmd_list(available=getattr(args, "available", False))
|
||||||
elif action is None:
|
elif action is None:
|
||||||
cmd_toggle()
|
cmd_toggle()
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -425,6 +425,31 @@ TOOL_CATEGORIES = {
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
|
"langfuse": {
|
||||||
|
"name": "Langfuse Observability",
|
||||||
|
"icon": "📊",
|
||||||
|
"providers": [
|
||||||
|
{
|
||||||
|
"name": "Langfuse Cloud",
|
||||||
|
"tag": "Hosted Langfuse (cloud.langfuse.com)",
|
||||||
|
"env_vars": [
|
||||||
|
{"key": "HERMES_LANGFUSE_PUBLIC_KEY", "prompt": "Langfuse public key (pk-lf-...)", "url": "https://cloud.langfuse.com"},
|
||||||
|
{"key": "HERMES_LANGFUSE_SECRET_KEY", "prompt": "Langfuse secret key (sk-lf-...)", "url": "https://cloud.langfuse.com"},
|
||||||
|
],
|
||||||
|
"post_setup": "langfuse",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Langfuse Self-Hosted",
|
||||||
|
"tag": "Self-hosted Langfuse instance",
|
||||||
|
"env_vars": [
|
||||||
|
{"key": "HERMES_LANGFUSE_PUBLIC_KEY", "prompt": "Langfuse public key (pk-lf-...)"},
|
||||||
|
{"key": "HERMES_LANGFUSE_SECRET_KEY", "prompt": "Langfuse secret key (sk-lf-...)"},
|
||||||
|
{"key": "HERMES_LANGFUSE_BASE_URL", "prompt": "Langfuse server URL (e.g. http://localhost:3000)", "default": "http://localhost:3000"},
|
||||||
|
],
|
||||||
|
"post_setup": "langfuse",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
# Simple env-var requirements for toolsets NOT in TOOL_CATEGORIES.
|
# Simple env-var requirements for toolsets NOT in TOOL_CATEGORIES.
|
||||||
@@ -567,6 +592,31 @@ def _run_post_setup(post_setup_key: str):
|
|||||||
_print_info(" git submodule update --init --recursive")
|
_print_info(" git submodule update --init --recursive")
|
||||||
_print_info(' uv pip install -e "./tinker-atropos"')
|
_print_info(' uv pip install -e "./tinker-atropos"')
|
||||||
|
|
||||||
|
elif post_setup_key == "langfuse":
|
||||||
|
# Install the langfuse SDK.
|
||||||
|
try:
|
||||||
|
__import__("langfuse")
|
||||||
|
_print_success(" langfuse SDK already installed")
|
||||||
|
except ImportError:
|
||||||
|
import subprocess
|
||||||
|
_print_info(" Installing langfuse SDK...")
|
||||||
|
result = subprocess.run(
|
||||||
|
[sys.executable, "-m", "pip", "install", "langfuse", "--quiet"],
|
||||||
|
capture_output=True, text=True, timeout=120,
|
||||||
|
)
|
||||||
|
if result.returncode == 0:
|
||||||
|
_print_success(" langfuse SDK installed")
|
||||||
|
else:
|
||||||
|
_print_warning(" langfuse SDK install failed — run manually: pip install langfuse")
|
||||||
|
# Install and enable the official optional plugin into ~/.hermes/plugins/.
|
||||||
|
try:
|
||||||
|
from hermes_cli.plugins_cmd import cmd_install as _plugins_install
|
||||||
|
_plugins_install("official/observability/langfuse", enable=True)
|
||||||
|
except SystemExit:
|
||||||
|
pass # cmd_install prints its own errors and calls sys.exit
|
||||||
|
_print_info(" Restart Hermes for tracing to take effect.")
|
||||||
|
_print_info(" Verify: hermes plugins list")
|
||||||
|
|
||||||
|
|
||||||
# ─── Platform / Toolset Helpers ───────────────────────────────────────────────
|
# ─── Platform / Toolset Helpers ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
|||||||
875
optional-plugins/observability/langfuse/__init__.py
Normal file
875
optional-plugins/observability/langfuse/__init__.py
Normal file
@@ -0,0 +1,875 @@
|
|||||||
|
"""langfuse — Hermes plugin for Langfuse observability.
|
||||||
|
|
||||||
|
Traces Hermes conversations, LLM calls, and tool usage to Langfuse.
|
||||||
|
Enable via ``hermes tools`` or by setting HERMES_LANGFUSE_ENABLED=true
|
||||||
|
and the required credentials in ~/.hermes/.env.
|
||||||
|
|
||||||
|
Required env vars (set via ``hermes tools`` or ~/.hermes/.env):
|
||||||
|
HERMES_LANGFUSE_ENABLED - set to "true" to activate tracing
|
||||||
|
HERMES_LANGFUSE_PUBLIC_KEY - Langfuse project public key (pk-lf-...)
|
||||||
|
HERMES_LANGFUSE_SECRET_KEY - Langfuse project secret key (sk-lf-...)
|
||||||
|
HERMES_LANGFUSE_BASE_URL - Langfuse server URL (default: https://cloud.langfuse.com)
|
||||||
|
|
||||||
|
Optional env vars:
|
||||||
|
HERMES_LANGFUSE_ENV - environment tag (e.g. "production", "local")
|
||||||
|
HERMES_LANGFUSE_RELEASE - release/version tag
|
||||||
|
HERMES_LANGFUSE_SAMPLE_RATE - sampling rate 0.0–1.0 (default: 1.0)
|
||||||
|
HERMES_LANGFUSE_MAX_CHARS - max chars per field (default: 12000)
|
||||||
|
HERMES_LANGFUSE_DEBUG - set to "true" for verbose logging
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from langfuse import Langfuse, propagate_attributes
|
||||||
|
except Exception: # pragma: no cover - fail-open when optional dep is missing
|
||||||
|
Langfuse = None
|
||||||
|
propagate_attributes = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TraceState:
|
||||||
|
trace_id: str
|
||||||
|
root_ctx: Any
|
||||||
|
root_span: Any
|
||||||
|
generations: Dict[str, Any] = field(default_factory=dict)
|
||||||
|
tools: Dict[str, Any] = field(default_factory=dict)
|
||||||
|
turn_tool_calls: list[dict[str, Any]] = field(default_factory=list)
|
||||||
|
last_updated_at: float = field(default_factory=time.time)
|
||||||
|
|
||||||
|
|
||||||
|
_STATE_LOCK = threading.Lock()
|
||||||
|
_TRACE_STATE: Dict[str, TraceState] = {}
|
||||||
|
_LANGFUSE_CLIENT = None
|
||||||
|
_READ_FILE_LINE_RE = re.compile(r"^\s*(\d+)\|(.*)$")
|
||||||
|
_READ_FILE_HEAD_LINES = 25
|
||||||
|
_READ_FILE_TAIL_LINES = 15
|
||||||
|
|
||||||
|
|
||||||
|
def _env(name: str, default: str = "") -> str:
|
||||||
|
return os.environ.get(name, default).strip()
|
||||||
|
|
||||||
|
|
||||||
|
def _env_bool(*names: str) -> bool:
|
||||||
|
for name in names:
|
||||||
|
value = _env(name).lower()
|
||||||
|
if value:
|
||||||
|
return value in {"1", "true", "yes", "on"}
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _debug_enabled() -> bool:
|
||||||
|
return _env_bool("HERMES_LANGFUSE_DEBUG")
|
||||||
|
|
||||||
|
|
||||||
|
def _debug(message: str) -> None:
|
||||||
|
if _debug_enabled():
|
||||||
|
logger.info("Langfuse tracing: %s", message)
|
||||||
|
|
||||||
|
|
||||||
|
def _is_enabled() -> bool:
|
||||||
|
if Langfuse is None:
|
||||||
|
return False
|
||||||
|
# Primary activation path: config.yaml plugins.langfuse.enabled
|
||||||
|
try:
|
||||||
|
from hermes_cli.config import load_config
|
||||||
|
_cfg = load_config()
|
||||||
|
_plugin_cfg = _cfg.get("plugins", {})
|
||||||
|
if isinstance(_plugin_cfg, dict):
|
||||||
|
_lt_cfg = _plugin_cfg.get("langfuse", {})
|
||||||
|
if isinstance(_lt_cfg, dict) and "enabled" in _lt_cfg:
|
||||||
|
if not _lt_cfg["enabled"]:
|
||||||
|
return False
|
||||||
|
# Explicit enabled=true in config — skip env-var check below
|
||||||
|
public_key = _env("HERMES_LANGFUSE_PUBLIC_KEY") or _env("LANGFUSE_PUBLIC_KEY")
|
||||||
|
secret_key = _env("HERMES_LANGFUSE_SECRET_KEY") or _env("LANGFUSE_SECRET_KEY")
|
||||||
|
return bool(public_key and secret_key)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# Backward-compat path: HERMES_LANGFUSE_ENABLED env var (legacy .env installs)
|
||||||
|
if not _env_bool("HERMES_LANGFUSE_ENABLED"):
|
||||||
|
return False
|
||||||
|
public_key = _env("HERMES_LANGFUSE_PUBLIC_KEY") or _env("LANGFUSE_PUBLIC_KEY")
|
||||||
|
secret_key = _env("HERMES_LANGFUSE_SECRET_KEY") or _env("LANGFUSE_SECRET_KEY")
|
||||||
|
return bool(public_key and secret_key)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_langfuse() -> Optional[Langfuse]:
|
||||||
|
global _LANGFUSE_CLIENT
|
||||||
|
if not _is_enabled():
|
||||||
|
return None
|
||||||
|
if _LANGFUSE_CLIENT is not None:
|
||||||
|
return _LANGFUSE_CLIENT
|
||||||
|
|
||||||
|
public_key = _env("HERMES_LANGFUSE_PUBLIC_KEY") or _env("LANGFUSE_PUBLIC_KEY")
|
||||||
|
secret_key = _env("HERMES_LANGFUSE_SECRET_KEY") or _env("LANGFUSE_SECRET_KEY")
|
||||||
|
base_url = _env("HERMES_LANGFUSE_BASE_URL") or _env("LANGFUSE_BASE_URL") or "https://cloud.langfuse.com"
|
||||||
|
environment = _env("HERMES_LANGFUSE_ENV") or _env("LANGFUSE_ENV")
|
||||||
|
release = _env("HERMES_LANGFUSE_RELEASE") or _env("LANGFUSE_RELEASE")
|
||||||
|
sample_rate = _env("HERMES_LANGFUSE_SAMPLE_RATE")
|
||||||
|
|
||||||
|
kwargs: Dict[str, Any] = {
|
||||||
|
"public_key": public_key,
|
||||||
|
"secret_key": secret_key,
|
||||||
|
"base_url": base_url,
|
||||||
|
}
|
||||||
|
if environment:
|
||||||
|
kwargs["environment"] = environment
|
||||||
|
if release:
|
||||||
|
kwargs["release"] = release
|
||||||
|
if sample_rate:
|
||||||
|
try:
|
||||||
|
kwargs["sample_rate"] = float(sample_rate)
|
||||||
|
except ValueError:
|
||||||
|
logger.warning("Invalid HERMES_LANGFUSE_SAMPLE_RATE=%r", sample_rate)
|
||||||
|
|
||||||
|
try:
|
||||||
|
_LANGFUSE_CLIENT = Langfuse(**kwargs)
|
||||||
|
except Exception as exc: # pragma: no cover - fail-open
|
||||||
|
logger.warning("Could not initialize Langfuse client: %s", exc)
|
||||||
|
return None
|
||||||
|
|
||||||
|
return _LANGFUSE_CLIENT
|
||||||
|
|
||||||
|
|
||||||
|
def _trace_key(task_id: str, session_id: str) -> str:
|
||||||
|
if task_id:
|
||||||
|
return task_id
|
||||||
|
if session_id:
|
||||||
|
return f"session:{session_id}"
|
||||||
|
return f"thread:{threading.get_ident()}"
|
||||||
|
|
||||||
|
|
||||||
|
def _truncate_text(value: str, max_chars: int) -> str:
|
||||||
|
if len(value) <= max_chars:
|
||||||
|
return value
|
||||||
|
return value[:max_chars] + f"... [truncated {len(value) - max_chars} chars]"
|
||||||
|
|
||||||
|
|
||||||
|
def _maybe_parse_json_string(value: str) -> Any:
|
||||||
|
stripped = value.strip()
|
||||||
|
if len(stripped) < 2 or stripped[0] not in "{[" or stripped[-1] not in "}]":
|
||||||
|
if len(stripped) < 2 or stripped[0] not in "{[":
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
parsed, idx = json.JSONDecoder().raw_decode(stripped)
|
||||||
|
except Exception:
|
||||||
|
return value
|
||||||
|
if not isinstance(parsed, (dict, list)):
|
||||||
|
return value
|
||||||
|
|
||||||
|
trailing = stripped[idx:].strip()
|
||||||
|
if not trailing:
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
hint_key = "_hint" if trailing.startswith("[Hint:") else "_trailing_text"
|
||||||
|
if isinstance(parsed, dict):
|
||||||
|
merged = dict(parsed)
|
||||||
|
key = hint_key if hint_key not in merged else "_trailing_text"
|
||||||
|
merged[key] = trailing
|
||||||
|
return merged
|
||||||
|
|
||||||
|
return {"data": parsed, hint_key: trailing}
|
||||||
|
|
||||||
|
|
||||||
|
def _looks_like_read_file_payload(value: Any) -> bool:
|
||||||
|
if not isinstance(value, dict):
|
||||||
|
return False
|
||||||
|
content = value.get("content")
|
||||||
|
return (
|
||||||
|
isinstance(content, str)
|
||||||
|
and "total_lines" in value
|
||||||
|
and "file_size" in value
|
||||||
|
and "is_binary" in value
|
||||||
|
and "is_image" in value
|
||||||
|
and not value.get("error")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_read_file_lines(content: str) -> list[dict[str, Any]]:
|
||||||
|
if not isinstance(content, str) or not content:
|
||||||
|
return []
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
for raw_line in content.splitlines():
|
||||||
|
match = _READ_FILE_LINE_RE.match(raw_line)
|
||||||
|
if not match:
|
||||||
|
return []
|
||||||
|
lines.append({
|
||||||
|
"line": int(match.group(1)),
|
||||||
|
"text": match.group(2),
|
||||||
|
})
|
||||||
|
return lines
|
||||||
|
|
||||||
|
|
||||||
|
def _build_read_file_preview(lines: list[dict[str, Any]]) -> dict[str, Any]:
|
||||||
|
if len(lines) <= (_READ_FILE_HEAD_LINES + _READ_FILE_TAIL_LINES):
|
||||||
|
return {"lines": lines}
|
||||||
|
|
||||||
|
return {
|
||||||
|
"head": lines[:_READ_FILE_HEAD_LINES],
|
||||||
|
"tail": lines[-_READ_FILE_TAIL_LINES:],
|
||||||
|
"omitted_line_count": len(lines) - _READ_FILE_HEAD_LINES - _READ_FILE_TAIL_LINES,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_read_file_payload(value: dict[str, Any], *, args: Any = None) -> dict[str, Any]:
|
||||||
|
normalized: dict[str, Any] = {}
|
||||||
|
if isinstance(args, dict):
|
||||||
|
path = args.get("path")
|
||||||
|
offset = args.get("offset")
|
||||||
|
limit = args.get("limit")
|
||||||
|
if isinstance(path, str) and path:
|
||||||
|
normalized["path"] = path
|
||||||
|
if isinstance(offset, int):
|
||||||
|
normalized["offset"] = offset
|
||||||
|
if isinstance(limit, int):
|
||||||
|
normalized["limit"] = limit
|
||||||
|
|
||||||
|
lines = _parse_read_file_lines(value.get("content", ""))
|
||||||
|
if lines:
|
||||||
|
normalized["returned_lines"] = {
|
||||||
|
"start": lines[0]["line"],
|
||||||
|
"end": lines[-1]["line"],
|
||||||
|
"count": len(lines),
|
||||||
|
}
|
||||||
|
normalized["content_preview"] = _build_read_file_preview(lines)
|
||||||
|
elif value.get("content"):
|
||||||
|
normalized["content_preview"] = {
|
||||||
|
"text": value.get("content", ""),
|
||||||
|
}
|
||||||
|
|
||||||
|
for key in (
|
||||||
|
"total_lines",
|
||||||
|
"file_size",
|
||||||
|
"truncated",
|
||||||
|
"is_binary",
|
||||||
|
"is_image",
|
||||||
|
"hint",
|
||||||
|
"_warning",
|
||||||
|
"mime_type",
|
||||||
|
"dimensions",
|
||||||
|
"similar_files",
|
||||||
|
"error",
|
||||||
|
):
|
||||||
|
if key in value:
|
||||||
|
normalized[key] = value[key]
|
||||||
|
|
||||||
|
base64_content = value.get("base64_content")
|
||||||
|
if isinstance(base64_content, str) and base64_content:
|
||||||
|
normalized["base64_content"] = {
|
||||||
|
"omitted": True,
|
||||||
|
"length": len(base64_content),
|
||||||
|
}
|
||||||
|
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_payload(value: Any, *, tool_name: str = "", args: Any = None) -> Any:
|
||||||
|
if _looks_like_read_file_payload(value):
|
||||||
|
return _normalize_read_file_payload(
|
||||||
|
value,
|
||||||
|
args=args if tool_name == "read_file" else None,
|
||||||
|
)
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_value(value: Any, *, max_chars: Optional[int] = None, depth: int = 0,
|
||||||
|
parse_json_strings: bool = False) -> Any:
|
||||||
|
max_chars = max_chars if max_chars is not None else int(_env("HERMES_LANGFUSE_MAX_CHARS", "12000") or "12000")
|
||||||
|
if depth > 4:
|
||||||
|
return "<max-depth>"
|
||||||
|
if value is None or isinstance(value, (int, float, bool)):
|
||||||
|
return value
|
||||||
|
if isinstance(value, bytes):
|
||||||
|
return {"type": "bytes", "len": len(value)}
|
||||||
|
if isinstance(value, str):
|
||||||
|
if parse_json_strings:
|
||||||
|
parsed = _maybe_parse_json_string(value)
|
||||||
|
if parsed is not value:
|
||||||
|
return _safe_value(parsed, max_chars=max_chars, depth=depth, parse_json_strings=True)
|
||||||
|
return _truncate_text(value, max_chars)
|
||||||
|
if isinstance(value, dict):
|
||||||
|
normalized = _normalize_payload(value)
|
||||||
|
if normalized is not value:
|
||||||
|
return _safe_value(normalized, max_chars=max_chars, depth=depth, parse_json_strings=parse_json_strings)
|
||||||
|
return {
|
||||||
|
str(k): _safe_value(v, max_chars=max_chars, depth=depth + 1, parse_json_strings=parse_json_strings)
|
||||||
|
for k, v in list(value.items())[:50]
|
||||||
|
}
|
||||||
|
if isinstance(value, (list, tuple, set)):
|
||||||
|
return [
|
||||||
|
_safe_value(v, max_chars=max_chars, depth=depth + 1, parse_json_strings=parse_json_strings)
|
||||||
|
for v in list(value)[:50]
|
||||||
|
]
|
||||||
|
if hasattr(value, "__dict__"):
|
||||||
|
return _safe_value(vars(value), max_chars=max_chars, depth=depth + 1, parse_json_strings=parse_json_strings)
|
||||||
|
return _truncate_text(repr(value), max_chars)
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_last_user_message(messages: Any) -> Any:
|
||||||
|
if not isinstance(messages, list):
|
||||||
|
return None
|
||||||
|
for message in reversed(messages):
|
||||||
|
if isinstance(message, dict) and message.get("role") == "user":
|
||||||
|
return {
|
||||||
|
"role": "user",
|
||||||
|
"content": _safe_value(message.get("content")),
|
||||||
|
}
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_messages(messages: Any) -> list[dict[str, Any]]:
|
||||||
|
if not isinstance(messages, list):
|
||||||
|
return []
|
||||||
|
serialized = []
|
||||||
|
for message in messages[-12:]:
|
||||||
|
if not isinstance(message, dict):
|
||||||
|
continue
|
||||||
|
role = message.get("role")
|
||||||
|
item = {
|
||||||
|
"role": role,
|
||||||
|
"content": _safe_value(
|
||||||
|
message.get("content"),
|
||||||
|
parse_json_strings=(role == "tool"),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
if role == "tool" and message.get("tool_call_id"):
|
||||||
|
item["tool_call_id"] = message.get("tool_call_id")
|
||||||
|
if message.get("tool_calls"):
|
||||||
|
item["tool_calls"] = _safe_value(message.get("tool_calls"), parse_json_strings=True)
|
||||||
|
serialized.append(item)
|
||||||
|
return serialized
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_tool_calls(tool_calls: Any) -> list[dict[str, Any]]:
|
||||||
|
if not tool_calls:
|
||||||
|
return []
|
||||||
|
serialized = []
|
||||||
|
for tool_call in tool_calls:
|
||||||
|
fn = getattr(tool_call, "function", None)
|
||||||
|
name = getattr(fn, "name", None) if fn else None
|
||||||
|
arguments = getattr(fn, "arguments", None) if fn else None
|
||||||
|
if isinstance(arguments, str):
|
||||||
|
try:
|
||||||
|
arguments = json.loads(arguments)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
serialized.append({
|
||||||
|
"id": getattr(tool_call, "id", None),
|
||||||
|
"name": name,
|
||||||
|
"arguments": _safe_value(arguments, parse_json_strings=True),
|
||||||
|
})
|
||||||
|
return serialized
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_assistant_message(message: Any) -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"content": _safe_value(getattr(message, "content", None)),
|
||||||
|
"reasoning": _safe_value(getattr(message, "reasoning", None)),
|
||||||
|
"tool_calls": _serialize_tool_calls(getattr(message, "tool_calls", None)),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _usage_and_cost(response: Any, *, provider: str, api_mode: str, model: str, base_url: str) -> tuple[dict[str, int], dict[str, float]]:
|
||||||
|
usage_details: Dict[str, int] = {}
|
||||||
|
cost_details: Dict[str, float] = {}
|
||||||
|
raw_usage = getattr(response, "usage", None)
|
||||||
|
if not raw_usage:
|
||||||
|
return usage_details, cost_details
|
||||||
|
|
||||||
|
try:
|
||||||
|
from agent.usage_pricing import estimate_usage_cost, normalize_usage
|
||||||
|
|
||||||
|
canonical = normalize_usage(raw_usage, provider=provider, api_mode=api_mode)
|
||||||
|
# Langfuse usage_details keys follow a naming convention:
|
||||||
|
# - Dashboard sums all keys containing "input" as input total
|
||||||
|
# - Dashboard sums all keys containing "output" as output total
|
||||||
|
# - If no "total" key, Langfuse derives it from all usage types
|
||||||
|
# Use Anthropic-style key names so cache tokens roll into the
|
||||||
|
# dashboard input total automatically.
|
||||||
|
# Ref: https://langfuse.com/docs/model-usage-and-cost
|
||||||
|
usage_details = {
|
||||||
|
"input": canonical.input_tokens,
|
||||||
|
"output": canonical.output_tokens,
|
||||||
|
}
|
||||||
|
if canonical.cache_read_tokens:
|
||||||
|
usage_details["cache_read_input_tokens"] = canonical.cache_read_tokens
|
||||||
|
if canonical.cache_write_tokens:
|
||||||
|
usage_details["cache_creation_input_tokens"] = canonical.cache_write_tokens
|
||||||
|
if canonical.reasoning_tokens:
|
||||||
|
usage_details["reasoning_tokens"] = canonical.reasoning_tokens
|
||||||
|
cost = estimate_usage_cost(
|
||||||
|
model,
|
||||||
|
canonical,
|
||||||
|
provider=provider,
|
||||||
|
base_url=base_url,
|
||||||
|
api_key="",
|
||||||
|
)
|
||||||
|
if cost.amount_usd is not None:
|
||||||
|
# Langfuse cost_details keys must match usage_details keys.
|
||||||
|
# Provide per-type breakdown so dashboard can show cost by type.
|
||||||
|
try:
|
||||||
|
from agent.usage_pricing import get_pricing_entry
|
||||||
|
from decimal import Decimal
|
||||||
|
_ONE_M = Decimal("1000000")
|
||||||
|
entry = get_pricing_entry(model, provider=provider, base_url=base_url)
|
||||||
|
if entry:
|
||||||
|
if entry.input_cost_per_million is not None and canonical.input_tokens:
|
||||||
|
cost_details["input"] = float(Decimal(canonical.input_tokens) * entry.input_cost_per_million / _ONE_M)
|
||||||
|
if entry.output_cost_per_million is not None and canonical.output_tokens:
|
||||||
|
cost_details["output"] = float(Decimal(canonical.output_tokens) * entry.output_cost_per_million / _ONE_M)
|
||||||
|
if entry.cache_read_cost_per_million is not None and canonical.cache_read_tokens:
|
||||||
|
cost_details["cache_read_input_tokens"] = float(Decimal(canonical.cache_read_tokens) * entry.cache_read_cost_per_million / _ONE_M)
|
||||||
|
if entry.cache_write_cost_per_million is not None and canonical.cache_write_tokens:
|
||||||
|
cost_details["cache_creation_input_tokens"] = float(Decimal(canonical.cache_write_tokens) * entry.cache_write_cost_per_million / _ONE_M)
|
||||||
|
else:
|
||||||
|
cost_details["total"] = float(cost.amount_usd)
|
||||||
|
except Exception:
|
||||||
|
cost_details["total"] = float(cost.amount_usd)
|
||||||
|
except Exception as exc: # pragma: no cover - fail-open
|
||||||
|
_debug(f"usage normalization failed: {exc}")
|
||||||
|
|
||||||
|
return usage_details, cost_details
|
||||||
|
|
||||||
|
|
||||||
|
def _start_root_trace(task_key: str, *, task_id: str, session_id: str, platform: str, provider: str, model: str,
|
||||||
|
api_mode: str, messages: Any, client: Langfuse) -> TraceState:
|
||||||
|
trace_id = client.create_trace_id(seed=f"{session_id or 'sessionless'}::{task_id or task_key}")
|
||||||
|
trace_input = _extract_last_user_message(messages)
|
||||||
|
metadata = {
|
||||||
|
"source": "hermes",
|
||||||
|
"task_id": task_id,
|
||||||
|
"platform": platform,
|
||||||
|
"provider": provider,
|
||||||
|
"model": model,
|
||||||
|
"api_mode": api_mode,
|
||||||
|
}
|
||||||
|
|
||||||
|
# session_id must be passed in trace_context for Langfuse session grouping.
|
||||||
|
trace_ctx: Dict[str, Any] = {"trace_id": trace_id}
|
||||||
|
if session_id:
|
||||||
|
trace_ctx["session_id"] = session_id
|
||||||
|
|
||||||
|
if propagate_attributes is not None:
|
||||||
|
try:
|
||||||
|
with propagate_attributes(
|
||||||
|
session_id=session_id or task_key,
|
||||||
|
trace_name="Hermes turn",
|
||||||
|
tags=["hermes", "langfuse"],
|
||||||
|
):
|
||||||
|
root_ctx = client.start_as_current_observation(
|
||||||
|
trace_context=trace_ctx,
|
||||||
|
name="Hermes turn",
|
||||||
|
as_type="chain",
|
||||||
|
input=trace_input,
|
||||||
|
metadata=metadata,
|
||||||
|
end_on_exit=False,
|
||||||
|
)
|
||||||
|
root_span = root_ctx.__enter__()
|
||||||
|
except Exception:
|
||||||
|
root_ctx = client.start_as_current_observation(
|
||||||
|
trace_context=trace_ctx,
|
||||||
|
name="Hermes turn",
|
||||||
|
as_type="chain",
|
||||||
|
input=trace_input,
|
||||||
|
metadata=metadata,
|
||||||
|
end_on_exit=False,
|
||||||
|
)
|
||||||
|
root_span = root_ctx.__enter__()
|
||||||
|
else:
|
||||||
|
root_ctx = client.start_as_current_observation(
|
||||||
|
trace_context=trace_ctx,
|
||||||
|
name="Hermes turn",
|
||||||
|
as_type="chain",
|
||||||
|
input=trace_input,
|
||||||
|
metadata=metadata,
|
||||||
|
end_on_exit=False,
|
||||||
|
)
|
||||||
|
root_span = root_ctx.__enter__()
|
||||||
|
|
||||||
|
try:
|
||||||
|
root_span.set_trace_io(input=trace_input)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
_debug(f"started trace {trace_id} for {task_key}")
|
||||||
|
return TraceState(trace_id=trace_id, root_ctx=root_ctx, root_span=root_span)
|
||||||
|
|
||||||
|
|
||||||
|
def _start_child_observation(state: TraceState, *, client: Langfuse, name: str, as_type: str,
|
||||||
|
input_value: Any, metadata: Optional[dict] = None,
|
||||||
|
model: Optional[str] = None, model_parameters: Optional[dict] = None) -> Any:
|
||||||
|
return state.root_span.start_observation(
|
||||||
|
name=name,
|
||||||
|
as_type=as_type,
|
||||||
|
input=input_value,
|
||||||
|
metadata=metadata or {},
|
||||||
|
model=model,
|
||||||
|
model_parameters=model_parameters,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _end_observation(observation: Any, *, output: Any = None, metadata: Optional[dict] = None,
|
||||||
|
usage_details: Optional[dict] = None, cost_details: Optional[dict] = None) -> None:
|
||||||
|
if observation is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
update_kwargs: Dict[str, Any] = {}
|
||||||
|
if output is not None:
|
||||||
|
update_kwargs["output"] = output
|
||||||
|
if metadata:
|
||||||
|
update_kwargs["metadata"] = metadata
|
||||||
|
if usage_details:
|
||||||
|
update_kwargs["usage_details"] = usage_details
|
||||||
|
if cost_details:
|
||||||
|
update_kwargs["cost_details"] = cost_details
|
||||||
|
if update_kwargs:
|
||||||
|
observation.update(**update_kwargs)
|
||||||
|
observation.end()
|
||||||
|
except Exception as exc: # pragma: no cover - fail-open
|
||||||
|
_debug(f"end observation failed: {exc}")
|
||||||
|
|
||||||
|
|
||||||
|
def _merge_trace_output(output: Any, state: TraceState) -> Any:
|
||||||
|
if not state.turn_tool_calls:
|
||||||
|
return output
|
||||||
|
|
||||||
|
merged = dict(output) if isinstance(output, dict) else {"content": output}
|
||||||
|
merged["tool_calls"] = list(state.turn_tool_calls)
|
||||||
|
return merged
|
||||||
|
|
||||||
|
|
||||||
|
def _finish_trace(task_key: str, *, output: Any = None) -> None:
|
||||||
|
client = _get_langfuse()
|
||||||
|
if client is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
with _STATE_LOCK:
|
||||||
|
state = _TRACE_STATE.pop(task_key, None)
|
||||||
|
if state is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
for observation in state.generations.values():
|
||||||
|
_end_observation(observation)
|
||||||
|
for observation in state.tools.values():
|
||||||
|
_end_observation(observation)
|
||||||
|
final_output = _merge_trace_output(output, state)
|
||||||
|
if final_output is not None:
|
||||||
|
state.root_span.set_trace_io(output=final_output)
|
||||||
|
state.root_span.update(output=final_output)
|
||||||
|
state.root_span.end()
|
||||||
|
except Exception as exc: # pragma: no cover - fail-open
|
||||||
|
_debug(f"finish trace failed: {exc}")
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
client.flush()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _assistant_has_tool_calls(message: Any) -> bool:
|
||||||
|
return bool(getattr(message, "tool_calls", None))
|
||||||
|
|
||||||
|
|
||||||
|
def _request_key(api_call_count: Any) -> str:
|
||||||
|
return str(api_call_count or 0)
|
||||||
|
|
||||||
|
|
||||||
|
def on_pre_llm_call(*, task_id: str = "", session_id: str = "", platform: str = "", model: str = "",
|
||||||
|
provider: str = "", base_url: str = "", api_mode: str = "",
|
||||||
|
api_call_count: int = 0, messages: Any = None, turn_type: str = "user",
|
||||||
|
conversation_history: Any = None, user_message: Any = None, **_: Any) -> None:
|
||||||
|
# Older Hermes branches used pre_llm_call for request-scoped tracing and
|
||||||
|
# passed the actual API messages. Current Hermes also has a turn-scoped
|
||||||
|
# pre_llm_call used for context injection; tracing that hook creates an
|
||||||
|
# extra orphan/root trace before the real request trace. Only trace the
|
||||||
|
# legacy request-shaped call here.
|
||||||
|
if not isinstance(messages, list):
|
||||||
|
return
|
||||||
|
|
||||||
|
client = _get_langfuse()
|
||||||
|
if client is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# messages is a list only for legacy Hermes branches that fired
|
||||||
|
# pre_llm_call with API messages directly. Current Hermes fires
|
||||||
|
# pre_llm_call for context injection (conversation_history/user_message,
|
||||||
|
# no messages list) — tracing that would create orphan traces.
|
||||||
|
task_key = _trace_key(task_id, session_id)
|
||||||
|
|
||||||
|
with _STATE_LOCK:
|
||||||
|
state = _TRACE_STATE.get(task_key)
|
||||||
|
if state is None:
|
||||||
|
state = _start_root_trace(
|
||||||
|
task_key,
|
||||||
|
task_id=task_id,
|
||||||
|
session_id=session_id,
|
||||||
|
platform=platform,
|
||||||
|
provider=provider,
|
||||||
|
model=model,
|
||||||
|
api_mode=api_mode,
|
||||||
|
messages=messages,
|
||||||
|
client=client,
|
||||||
|
)
|
||||||
|
_TRACE_STATE[task_key] = state
|
||||||
|
state.last_updated_at = time.time()
|
||||||
|
|
||||||
|
|
||||||
|
def on_pre_llm_request(
|
||||||
|
*,
|
||||||
|
task_id: str = "",
|
||||||
|
session_id: str = "",
|
||||||
|
platform: str = "",
|
||||||
|
model: str = "",
|
||||||
|
provider: str = "",
|
||||||
|
base_url: str = "",
|
||||||
|
api_mode: str = "",
|
||||||
|
api_call_count: int = 0,
|
||||||
|
messages: Any = None,
|
||||||
|
turn_type: str = "user",
|
||||||
|
message_count: int = 0,
|
||||||
|
tool_count: int = 0,
|
||||||
|
approx_input_tokens: int = 0,
|
||||||
|
request_char_count: int = 0,
|
||||||
|
max_tokens: Any = None,
|
||||||
|
**_: Any,
|
||||||
|
) -> None:
|
||||||
|
client = _get_langfuse()
|
||||||
|
if client is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
task_key = _trace_key(task_id, session_id)
|
||||||
|
req_key = _request_key(api_call_count)
|
||||||
|
|
||||||
|
with _STATE_LOCK:
|
||||||
|
state = _TRACE_STATE.get(task_key)
|
||||||
|
if state is None:
|
||||||
|
state = _start_root_trace(
|
||||||
|
task_key,
|
||||||
|
task_id=task_id,
|
||||||
|
session_id=session_id,
|
||||||
|
platform=platform,
|
||||||
|
provider=provider,
|
||||||
|
model=model,
|
||||||
|
api_mode=api_mode,
|
||||||
|
messages=messages,
|
||||||
|
client=client,
|
||||||
|
)
|
||||||
|
_TRACE_STATE[task_key] = state
|
||||||
|
state.last_updated_at = time.time()
|
||||||
|
previous = state.generations.pop(req_key, None)
|
||||||
|
if previous is not None:
|
||||||
|
_end_observation(previous)
|
||||||
|
state.generations[req_key] = _start_child_observation(
|
||||||
|
state,
|
||||||
|
client=client,
|
||||||
|
name=f"LLM call {api_call_count}",
|
||||||
|
as_type="generation",
|
||||||
|
input_value=_serialize_messages(messages),
|
||||||
|
metadata={
|
||||||
|
"provider": provider,
|
||||||
|
"platform": platform,
|
||||||
|
"api_mode": api_mode,
|
||||||
|
"base_url": base_url,
|
||||||
|
},
|
||||||
|
model=model,
|
||||||
|
model_parameters={"api_mode": api_mode, "provider": provider},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def on_post_llm_call(*, task_id: str = "", session_id: str = "", provider: str = "", base_url: str = "",
|
||||||
|
api_mode: str = "", model: str = "", api_call_count: int = 0,
|
||||||
|
assistant_message: Any = None, response: Any = None,
|
||||||
|
api_duration: float = 0.0, finish_reason: str = "",
|
||||||
|
usage: Any = None, assistant_content_chars: int = 0,
|
||||||
|
assistant_tool_call_count: int = 0, assistant_response: Any = None,
|
||||||
|
**_: Any) -> None:
|
||||||
|
client = _get_langfuse()
|
||||||
|
if client is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
task_key = _trace_key(task_id, session_id)
|
||||||
|
req_key = _request_key(api_call_count)
|
||||||
|
|
||||||
|
with _STATE_LOCK:
|
||||||
|
state = _TRACE_STATE.get(task_key)
|
||||||
|
generation = state.generations.pop(req_key, None) if state else None
|
||||||
|
if state is None or generation is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Handle both call patterns:
|
||||||
|
# 1. post_api_request: passes usage (dict), assistant_content_chars, assistant_tool_call_count
|
||||||
|
# 2. post_llm_call: passes assistant_message (object), response (object), assistant_response (str)
|
||||||
|
if assistant_message is not None:
|
||||||
|
output = _serialize_assistant_message(assistant_message)
|
||||||
|
elif assistant_response is not None:
|
||||||
|
# post_llm_call passes assistant_response as a plain string
|
||||||
|
output = {"content": _safe_value(assistant_response), "reasoning": None, "tool_calls": []}
|
||||||
|
else:
|
||||||
|
# post_api_request path — reconstruct from summary kwargs
|
||||||
|
output = {
|
||||||
|
"content": f"[{assistant_content_chars} chars]" if assistant_content_chars else None,
|
||||||
|
"reasoning": None,
|
||||||
|
"tool_calls": [{"id": f"tc_{i}"} for i in range(assistant_tool_call_count)] if assistant_tool_call_count else [],
|
||||||
|
}
|
||||||
|
|
||||||
|
if output.get("tool_calls"):
|
||||||
|
state.turn_tool_calls.extend(output["tool_calls"])
|
||||||
|
|
||||||
|
# Extract usage: prefer response object, fall back to usage dict from post_api_request
|
||||||
|
if response is not None:
|
||||||
|
usage_details, cost_details = _usage_and_cost(
|
||||||
|
response,
|
||||||
|
provider=provider,
|
||||||
|
api_mode=api_mode,
|
||||||
|
model=model,
|
||||||
|
base_url=base_url,
|
||||||
|
)
|
||||||
|
elif isinstance(usage, dict) and usage:
|
||||||
|
# post_api_request passes a pre-built CanonicalUsage summary dict.
|
||||||
|
# Use Langfuse-convention key names: "input", "output", and
|
||||||
|
# "cache_read_input_tokens" / "cache_creation_input_tokens" so the
|
||||||
|
# dashboard sums cache tokens into the input total automatically.
|
||||||
|
_input = usage.get("input_tokens", 0)
|
||||||
|
_output = usage.get("output_tokens", 0) or usage.get("completion_tokens", 0)
|
||||||
|
_cache_read = usage.get("cache_read_tokens", 0)
|
||||||
|
_cache_write = usage.get("cache_write_tokens", 0)
|
||||||
|
_reasoning = usage.get("reasoning_tokens", 0)
|
||||||
|
usage_details = {
|
||||||
|
"input": _input,
|
||||||
|
"output": _output,
|
||||||
|
}
|
||||||
|
if _cache_read:
|
||||||
|
usage_details["cache_read_input_tokens"] = _cache_read
|
||||||
|
if _cache_write:
|
||||||
|
usage_details["cache_creation_input_tokens"] = _cache_write
|
||||||
|
if _reasoning:
|
||||||
|
usage_details["reasoning_tokens"] = _reasoning
|
||||||
|
cost_details = {}
|
||||||
|
# Estimate per-type cost from the summary if possible
|
||||||
|
try:
|
||||||
|
from agent.usage_pricing import CanonicalUsage, estimate_usage_cost, get_pricing_entry
|
||||||
|
from decimal import Decimal
|
||||||
|
_ONE_M = Decimal("1000000")
|
||||||
|
_cu = CanonicalUsage(
|
||||||
|
input_tokens=_input,
|
||||||
|
output_tokens=_output,
|
||||||
|
cache_read_tokens=_cache_read,
|
||||||
|
cache_write_tokens=_cache_write,
|
||||||
|
reasoning_tokens=_reasoning,
|
||||||
|
)
|
||||||
|
entry = get_pricing_entry(model, provider=provider, base_url=base_url)
|
||||||
|
if entry:
|
||||||
|
if entry.input_cost_per_million is not None and _input:
|
||||||
|
cost_details["input"] = float(Decimal(_input) * entry.input_cost_per_million / _ONE_M)
|
||||||
|
if entry.output_cost_per_million is not None and _output:
|
||||||
|
cost_details["output"] = float(Decimal(_output) * entry.output_cost_per_million / _ONE_M)
|
||||||
|
if entry.cache_read_cost_per_million is not None and _cache_read:
|
||||||
|
cost_details["cache_read_input_tokens"] = float(Decimal(_cache_read) * entry.cache_read_cost_per_million / _ONE_M)
|
||||||
|
if entry.cache_write_cost_per_million is not None and _cache_write:
|
||||||
|
cost_details["cache_creation_input_tokens"] = float(Decimal(_cache_write) * entry.cache_write_cost_per_million / _ONE_M)
|
||||||
|
else:
|
||||||
|
_cost = estimate_usage_cost(model, _cu, provider=provider, base_url=base_url, api_key="")
|
||||||
|
if _cost.amount_usd is not None:
|
||||||
|
cost_details["total"] = float(_cost.amount_usd)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
usage_details, cost_details = {}, {}
|
||||||
|
|
||||||
|
tool_count = len(output.get("tool_calls", [])) or assistant_tool_call_count
|
||||||
|
gen_metadata: Dict[str, Any] = {"tool_call_count": tool_count}
|
||||||
|
if api_duration and api_duration > 0:
|
||||||
|
gen_metadata["api_duration_s"] = round(api_duration, 3)
|
||||||
|
if finish_reason:
|
||||||
|
gen_metadata["finish_reason"] = finish_reason
|
||||||
|
_end_observation(
|
||||||
|
generation,
|
||||||
|
output=output,
|
||||||
|
usage_details=usage_details,
|
||||||
|
cost_details=cost_details,
|
||||||
|
metadata=gen_metadata,
|
||||||
|
)
|
||||||
|
|
||||||
|
has_tools = _assistant_has_tool_calls(assistant_message) if assistant_message else (assistant_tool_call_count > 0)
|
||||||
|
has_content = bool(output.get("content"))
|
||||||
|
if not has_tools and has_content:
|
||||||
|
_finish_trace(task_key, output=output)
|
||||||
|
|
||||||
|
|
||||||
|
def on_pre_tool_call(*, tool_name: str = "", args: Any = None, task_id: str = "",
|
||||||
|
session_id: str = "", tool_call_id: str = "", **_: Any) -> None:
|
||||||
|
client = _get_langfuse()
|
||||||
|
if client is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
task_key = _trace_key(task_id, session_id)
|
||||||
|
tool_key = tool_call_id or f"{tool_name}:{time.time_ns()}"
|
||||||
|
|
||||||
|
with _STATE_LOCK:
|
||||||
|
state = _TRACE_STATE.get(task_key)
|
||||||
|
if state is None:
|
||||||
|
return
|
||||||
|
state.tools[tool_key] = _start_child_observation(
|
||||||
|
state,
|
||||||
|
client=client,
|
||||||
|
name=f"Tool: {tool_name}",
|
||||||
|
as_type="tool",
|
||||||
|
input_value=_safe_value(args),
|
||||||
|
metadata={"tool_name": tool_name, "tool_call_id": tool_call_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def on_post_tool_call(*, tool_name: str = "", args: Any = None, result: Any = None,
|
||||||
|
task_id: str = "", session_id: str = "", tool_call_id: str = "", **_: Any) -> None:
|
||||||
|
task_key = _trace_key(task_id, session_id)
|
||||||
|
tool_key = tool_call_id or ""
|
||||||
|
observation = None
|
||||||
|
|
||||||
|
with _STATE_LOCK:
|
||||||
|
state = _TRACE_STATE.get(task_key)
|
||||||
|
if state is None:
|
||||||
|
return
|
||||||
|
if tool_key:
|
||||||
|
observation = state.tools.pop(tool_key, None)
|
||||||
|
elif state.tools:
|
||||||
|
_, observation = state.tools.popitem()
|
||||||
|
|
||||||
|
if observation is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(result, str):
|
||||||
|
result_value = _maybe_parse_json_string(result)
|
||||||
|
else:
|
||||||
|
result_value = result
|
||||||
|
result_value = _normalize_payload(result_value, tool_name=tool_name, args=args)
|
||||||
|
|
||||||
|
_end_observation(
|
||||||
|
observation,
|
||||||
|
output=_safe_value(result_value, parse_json_strings=True),
|
||||||
|
metadata={"tool_name": tool_name, "args": _safe_value(args, parse_json_strings=True)},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def register(ctx) -> None:
|
||||||
|
# Register for both hook name variants so the plugin works across
|
||||||
|
# Hermes versions. pre_api_request / post_api_request fire per API
|
||||||
|
# call (preferred); pre_llm_call / post_llm_call fire once per turn.
|
||||||
|
ctx.register_hook("pre_api_request", on_pre_llm_request)
|
||||||
|
ctx.register_hook("post_api_request", on_post_llm_call)
|
||||||
|
ctx.register_hook("pre_llm_call", on_pre_llm_call)
|
||||||
|
ctx.register_hook("post_llm_call", on_post_llm_call)
|
||||||
|
ctx.register_hook("pre_tool_call", on_pre_tool_call)
|
||||||
|
ctx.register_hook("post_tool_call", on_post_tool_call)
|
||||||
38
optional-plugins/observability/langfuse/after-install.md
Normal file
38
optional-plugins/observability/langfuse/after-install.md
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
# After installing langfuse
|
||||||
|
|
||||||
|
Langfuse tracing is now installed and enabled for your Hermes profile.
|
||||||
|
|
||||||
|
## Required credentials
|
||||||
|
|
||||||
|
Set these in `~/.hermes/.env` (or via `hermes tools` → Langfuse Observability):
|
||||||
|
|
||||||
|
```bash
|
||||||
|
HERMES_LANGFUSE_PUBLIC_KEY=pk-lf-...
|
||||||
|
HERMES_LANGFUSE_SECRET_KEY=sk-lf-...
|
||||||
|
HERMES_LANGFUSE_BASE_URL=https://cloud.langfuse.com # or your self-hosted URL
|
||||||
|
```
|
||||||
|
|
||||||
|
## Verify
|
||||||
|
|
||||||
|
```bash
|
||||||
|
hermes plugins list # langfuse should appear as enabled
|
||||||
|
hermes chat -q "hello" # then check Langfuse for a "Hermes turn" trace
|
||||||
|
```
|
||||||
|
|
||||||
|
## Optional settings
|
||||||
|
|
||||||
|
```bash
|
||||||
|
HERMES_LANGFUSE_ENV=production # environment tag
|
||||||
|
HERMES_LANGFUSE_RELEASE=v1.0.0 # release tag
|
||||||
|
HERMES_LANGFUSE_SAMPLE_RATE=0.5 # sample 50% of traces
|
||||||
|
HERMES_LANGFUSE_MAX_CHARS=12000 # max chars per field (default: 12000)
|
||||||
|
HERMES_LANGFUSE_DEBUG=true # verbose plugin logging
|
||||||
|
```
|
||||||
|
|
||||||
|
## Dependencies
|
||||||
|
|
||||||
|
The `langfuse` Python SDK is required. Install it into your Hermes venv:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip install langfuse
|
||||||
|
```
|
||||||
14
optional-plugins/observability/langfuse/plugin.yaml
Normal file
14
optional-plugins/observability/langfuse/plugin.yaml
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
name: langfuse
|
||||||
|
version: "1.0.0"
|
||||||
|
description: "Optional Langfuse observability for Hermes — traces conversations, LLM calls, and tool usage. Install via: hermes plugins install official/observability/langfuse"
|
||||||
|
author: NousResearch
|
||||||
|
requires_env:
|
||||||
|
- HERMES_LANGFUSE_PUBLIC_KEY
|
||||||
|
- HERMES_LANGFUSE_SECRET_KEY
|
||||||
|
hooks:
|
||||||
|
- pre_api_request
|
||||||
|
- post_api_request
|
||||||
|
- pre_llm_call
|
||||||
|
- post_llm_call
|
||||||
|
- pre_tool_call
|
||||||
|
- post_tool_call
|
||||||
168
tests/hermes_cli/test_optional_plugins.py
Normal file
168
tests/hermes_cli/test_optional_plugins.py
Normal file
@@ -0,0 +1,168 @@
|
|||||||
|
"""Tests for optional-plugins (official) install path in plugins_cmd."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _make_official_plugin_dir(tmp_path: Path, category: str, name: str) -> Path:
|
||||||
|
"""Create a minimal optional-plugin directory structure."""
|
||||||
|
plugin_dir = tmp_path / "optional-plugins" / category / name
|
||||||
|
plugin_dir.mkdir(parents=True)
|
||||||
|
(plugin_dir / "plugin.yaml").write_text(
|
||||||
|
f"name: {name}\nversion: 1.0.0\ndescription: Test plugin\n"
|
||||||
|
)
|
||||||
|
(plugin_dir / "__init__.py").write_text("def register(ctx): pass\n")
|
||||||
|
return plugin_dir
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _resolve_official_plugin
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestResolveOfficialPlugin:
|
||||||
|
def test_returns_none_for_git_url(self, tmp_path):
|
||||||
|
from hermes_cli.plugins_cmd import _resolve_official_plugin
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir", return_value=tmp_path / "optional-plugins"):
|
||||||
|
result = _resolve_official_plugin("https://github.com/owner/repo.git")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_returns_none_for_owner_repo(self, tmp_path):
|
||||||
|
from hermes_cli.plugins_cmd import _resolve_official_plugin
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir", return_value=tmp_path / "optional-plugins"):
|
||||||
|
result = _resolve_official_plugin("owner/repo")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_returns_none_for_missing_plugin(self, tmp_path):
|
||||||
|
from hermes_cli.plugins_cmd import _resolve_official_plugin
|
||||||
|
(tmp_path / "optional-plugins").mkdir()
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir", return_value=tmp_path / "optional-plugins"):
|
||||||
|
result = _resolve_official_plugin("official/observability/nonexistent")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_returns_path_for_existing_plugin(self, tmp_path):
|
||||||
|
from hermes_cli.plugins_cmd import _resolve_official_plugin
|
||||||
|
plugin_dir = _make_official_plugin_dir(tmp_path, "observability", "langfuse")
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir", return_value=tmp_path / "optional-plugins"):
|
||||||
|
result = _resolve_official_plugin("official/observability/langfuse")
|
||||||
|
assert result == plugin_dir
|
||||||
|
|
||||||
|
def test_accepts_without_official_prefix(self, tmp_path):
|
||||||
|
from hermes_cli.plugins_cmd import _resolve_official_plugin
|
||||||
|
plugin_dir = _make_official_plugin_dir(tmp_path, "observability", "langfuse")
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir", return_value=tmp_path / "optional-plugins"):
|
||||||
|
result = _resolve_official_plugin("observability/langfuse")
|
||||||
|
assert result == plugin_dir
|
||||||
|
|
||||||
|
def test_traversal_blocked(self, tmp_path):
|
||||||
|
from hermes_cli.plugins_cmd import _resolve_official_plugin
|
||||||
|
(tmp_path / "optional-plugins").mkdir()
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir", return_value=tmp_path / "optional-plugins"):
|
||||||
|
result = _resolve_official_plugin("official/../../etc/passwd")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _list_official_plugins
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestListOfficialPlugins:
|
||||||
|
def test_empty_when_no_optional_plugins_dir(self, tmp_path):
|
||||||
|
from hermes_cli.plugins_cmd import _list_official_plugins
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir", return_value=tmp_path / "nonexistent"):
|
||||||
|
result = _list_official_plugins()
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
def test_lists_plugins_with_descriptions(self, tmp_path):
|
||||||
|
from hermes_cli.plugins_cmd import _list_official_plugins
|
||||||
|
_make_official_plugin_dir(tmp_path, "observability", "langfuse")
|
||||||
|
_make_official_plugin_dir(tmp_path, "observability", "other-plugin")
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir", return_value=tmp_path / "optional-plugins"):
|
||||||
|
result = _list_official_plugins()
|
||||||
|
identifiers = [r[0] for r in result]
|
||||||
|
assert "official/observability/langfuse" in identifiers
|
||||||
|
assert "official/observability/other-plugin" in identifiers
|
||||||
|
|
||||||
|
def test_descriptions_parsed_from_yaml(self, tmp_path):
|
||||||
|
from hermes_cli.plugins_cmd import _list_official_plugins
|
||||||
|
plugin_dir = _make_official_plugin_dir(tmp_path, "observability", "langfuse")
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir", return_value=tmp_path / "optional-plugins"):
|
||||||
|
result = _list_official_plugins()
|
||||||
|
assert any(desc == "Test plugin" for _, desc in result)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# cmd_install — official path
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCmdInstallOfficial:
|
||||||
|
def test_install_official_plugin_copies_files(self, tmp_path, monkeypatch):
|
||||||
|
from hermes_cli.plugins_cmd import cmd_install
|
||||||
|
plugin_dir = _make_official_plugin_dir(tmp_path, "observability", "langfuse")
|
||||||
|
user_plugins = tmp_path / "user-plugins"
|
||||||
|
user_plugins.mkdir()
|
||||||
|
|
||||||
|
monkeypatch.setattr("hermes_cli.plugins_cmd._optional_plugins_dir",
|
||||||
|
lambda: tmp_path / "optional-plugins")
|
||||||
|
monkeypatch.setattr("hermes_cli.plugins_cmd._plugins_dir",
|
||||||
|
lambda: user_plugins)
|
||||||
|
# Non-interactive: don't prompt
|
||||||
|
monkeypatch.setattr("sys.stdin.isatty", lambda: False)
|
||||||
|
|
||||||
|
cmd_install("official/observability/langfuse", enable=False)
|
||||||
|
|
||||||
|
installed = user_plugins / "langfuse"
|
||||||
|
assert installed.is_dir()
|
||||||
|
assert (installed / "plugin.yaml").exists()
|
||||||
|
assert (installed / "__init__.py").exists()
|
||||||
|
|
||||||
|
def test_install_official_plugin_respects_force(self, tmp_path, monkeypatch):
|
||||||
|
from hermes_cli.plugins_cmd import cmd_install
|
||||||
|
plugin_dir = _make_official_plugin_dir(tmp_path, "observability", "langfuse")
|
||||||
|
user_plugins = tmp_path / "user-plugins"
|
||||||
|
user_plugins.mkdir()
|
||||||
|
# Pre-create to simulate already-installed
|
||||||
|
already = user_plugins / "langfuse"
|
||||||
|
already.mkdir()
|
||||||
|
(already / "old.txt").write_text("old")
|
||||||
|
|
||||||
|
monkeypatch.setattr("hermes_cli.plugins_cmd._optional_plugins_dir",
|
||||||
|
lambda: tmp_path / "optional-plugins")
|
||||||
|
monkeypatch.setattr("hermes_cli.plugins_cmd._plugins_dir",
|
||||||
|
lambda: user_plugins)
|
||||||
|
monkeypatch.setattr("sys.stdin.isatty", lambda: False)
|
||||||
|
|
||||||
|
cmd_install("official/observability/langfuse", force=True, enable=False)
|
||||||
|
|
||||||
|
# Old file should be gone, new files present
|
||||||
|
assert not (already / "old.txt").exists()
|
||||||
|
assert (already / "plugin.yaml").exists()
|
||||||
|
|
||||||
|
def test_install_official_plugin_exits_without_force_when_exists(self, tmp_path, monkeypatch):
|
||||||
|
from hermes_cli.plugins_cmd import cmd_install
|
||||||
|
_make_official_plugin_dir(tmp_path, "observability", "langfuse")
|
||||||
|
user_plugins = tmp_path / "user-plugins"
|
||||||
|
user_plugins.mkdir()
|
||||||
|
(user_plugins / "langfuse").mkdir()
|
||||||
|
|
||||||
|
monkeypatch.setattr("hermes_cli.plugins_cmd._optional_plugins_dir",
|
||||||
|
lambda: tmp_path / "optional-plugins")
|
||||||
|
monkeypatch.setattr("hermes_cli.plugins_cmd._plugins_dir",
|
||||||
|
lambda: user_plugins)
|
||||||
|
|
||||||
|
with pytest.raises(SystemExit):
|
||||||
|
cmd_install("official/observability/langfuse", enable=False)
|
||||||
|
|
||||||
|
def test_git_url_not_mistaken_for_official(self, tmp_path, monkeypatch):
|
||||||
|
"""A git URL must not trigger the official install path."""
|
||||||
|
from hermes_cli.plugins_cmd import _resolve_official_plugin
|
||||||
|
with patch("hermes_cli.plugins_cmd._optional_plugins_dir",
|
||||||
|
return_value=tmp_path / "optional-plugins"):
|
||||||
|
assert _resolve_official_plugin("https://github.com/owner/repo") is None
|
||||||
|
assert _resolve_official_plugin("owner/repo") is None
|
||||||
Reference in New Issue
Block a user