Compare commits

..

1 Commits

Author SHA1 Message Date
teknium1
b54591ddda fix(docker): require explicit env allowlist for container creds 2026-03-15 10:38:30 -07:00
23 changed files with 177 additions and 2602 deletions

View File

@@ -107,6 +107,12 @@ terminal:
# timeout: 180
# lifetime_seconds: 300
# docker_image: "nikolaik/python-nodejs:python3.11-nodejs20"
# # Optional: explicitly forward selected env vars into Docker.
# # These values come from your current shell first, then ~/.hermes/.env.
# # Warning: anything forwarded here is visible to commands run in the container.
# docker_forward_env:
# - "GITHUB_TOKEN"
# - "NPM_TOKEN"
# -----------------------------------------------------------------------------
# OPTION 4: Singularity/Apptainer container

2
cli.py
View File

@@ -158,6 +158,7 @@ def load_cli_config() -> Dict[str, Any]:
"timeout": 60,
"lifetime_seconds": 300,
"docker_image": "python:3.11",
"docker_forward_env": [],
"singularity_image": "docker://python:3.11",
"modal_image": "python:3.11",
"daytona_image": "nikolaik/python-nodejs:python3.11-nodejs20",
@@ -313,6 +314,7 @@ def load_cli_config() -> Dict[str, Any]:
"timeout": "TERMINAL_TIMEOUT",
"lifetime_seconds": "TERMINAL_LIFETIME_SECONDS",
"docker_image": "TERMINAL_DOCKER_IMAGE",
"docker_forward_env": "TERMINAL_DOCKER_FORWARD_ENV",
"singularity_image": "TERMINAL_SINGULARITY_IMAGE",
"modal_image": "TERMINAL_MODAL_IMAGE",
"daytona_image": "TERMINAL_DAYTONA_IMAGE",

View File

@@ -64,6 +64,7 @@ if _config_path.exists():
"timeout": "TERMINAL_TIMEOUT",
"lifetime_seconds": "TERMINAL_LIFETIME_SECONDS",
"docker_image": "TERMINAL_DOCKER_IMAGE",
"docker_forward_env": "TERMINAL_DOCKER_FORWARD_ENV",
"singularity_image": "TERMINAL_SINGULARITY_IMAGE",
"modal_image": "TERMINAL_MODAL_IMAGE",
"daytona_image": "TERMINAL_DAYTONA_IMAGE",

View File

@@ -106,6 +106,7 @@ DEFAULT_CONFIG = {
"cwd": ".", # Use current directory
"timeout": 180,
"docker_image": "nikolaik/python-nodejs:python3.11-nodejs20",
"docker_forward_env": [],
"singularity_image": "docker://nikolaik/python-nodejs:python3.11-nodejs20",
"modal_image": "nikolaik/python-nodejs:python3.11-nodejs20",
"daytona_image": "nikolaik/python-nodejs:python3.11-nodejs20",
@@ -302,7 +303,7 @@ DEFAULT_CONFIG = {
},
# Config schema version - bump this when adding new required fields
"_config_version": 8,
"_config_version": 9,
}
# =============================================================================

View File

@@ -1,193 +0,0 @@
---
name: vercel-obs
description: Investigate Vercel-deployed apps by collecting runtime logs or configuring a drain to a local receiver, correlating the data with the current codebase, and producing bug-focused observability reports.
version: 1.0.0
author: Hermes Agent
license: MIT
metadata:
hermes:
tags: [vercel, observability, logging, debugging, production]
related_skills: [native-mcp]
---
# Vercel Obs
Use this skill when the current app is deployed to Vercel and the user wants a code-aware observability pass over recent runtime logs or a temporary drain-backed capture session.
## Prerequisites
- `vercel` CLI installed and logged in
- Repo linked to a Vercel project, or the user can provide a project id/name
- For one-shot live drain capture: `cloudflared` or `ngrok` installed locally
- Drain support is plan-dependent; if drains are unavailable, fall back to runtime-log analysis
## Helper Script
Installed path:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py
```
Read `references/vercel.md` if you need the current Vercel constraints or API assumptions.
## Workflow
### 1. Preflight
Always start with:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py preflight
```
This checks for:
- linked Vercel project metadata
- CLI availability and version
- current Vercel login state
- whether `vercel api` is available for drain operations
If the repo is not linked or the CLI is not authenticated, stop and explain the blocker.
### 2. Immediate Runtime Analysis
Use runtime logs first so the user gets signal immediately:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py collect-runtime --since 30m
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py analyze --since 30m --report-path .hermes/observability/reports/runtime-report.md
```
This path is the default fallback when drain setup is not possible.
### 3. One-Shot Live Session
For a single prompt workflow, prefer the built-in orchestration command:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py live-session \
--minutes 10 \
--environment production \
--report-path .hermes/observability/reports/live-session.md
```
This command will:
- start the local drain receiver
- launch a tunnel with `cloudflared` or `ngrok`
- create a temporary Vercel drain against the linked project
- collect logs for the requested window
- delete the drain
- stop the tunnel and receiver
- analyze only the rows captured during that session
- write a report
Useful flags:
- `--project-id prj_123` if the repo is not linked
- `--scope team_slug` for team-scoped Vercel access
- `--source serverless --source edge-function` to narrow the capture
- `--tunnel cloudflared` to force a provider
- `--name-prefix hermes-incident` to change the temporary drain name prefix
If the tunnel binary is missing or the drain cannot be created, the script should still clean up the local receiver before exiting with an error.
### 4. Local Receiver for Manual Live Drain Capture
Use the manual steps below only when you need fine-grained control.
Start the receiver in the background:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py serve --port 4319 --secret YOUR_SHARED_SECRET
```
Run it through Hermes background process support so it stays alive.
The receiver writes to:
- `.hermes/observability/logs.sqlite3`
- `.hermes/observability/raw/`
### 5. Expose the Receiver
If the receiver is only listening on localhost, expose it with a tunnel before creating the drain.
Preferred manual pattern:
```bash
cloudflared tunnel --url http://127.0.0.1:4319 --no-autoupdate
```
Parse the public HTTPS URL from the tunnel output. If no tunnel is available, explain that Vercel cannot deliver drains to a private localhost endpoint.
### 6. Create or Reuse the Drain
Once you have a public URL:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py ensure-drain \
--name hermes-observability \
--target-url https://example.trycloudflare.com \
--project-id prj_123 \
--secret YOUR_SHARED_SECRET \
--source static \
--source serverless \
--source edge-function
```
If the project id is omitted, the script tries `.vercel/project.json`.
For teardown:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py delete-drain --drain-id d_123
```
### 7. Analyze and Report
Generate a report after enough logs have arrived:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py analyze \
--since 2h \
--sample-limit 20 \
--report-path .hermes/observability/reports/observability-report.md
```
The report should prioritize:
- bug candidates
- noisy or superfluous logs
- missing context in error logs
- concrete fix proposals tied back to likely files in the repo
## Output Expectations
When using this skill, produce:
1. A short status summary of what mode you used: runtime only or drain-backed
2. The report path
3. The highest-signal findings first
4. Concrete next steps, including drain cleanup if you created one
## Hermes Prompt Patterns
Use prompts like:
```text
/vercel-obs Run a 10 minute live observability session for this repo: start live collection, set up the tunnel, create a temporary drain, collect logs, clean everything up, analyze the captured data, and write the report to .hermes/observability/reports/live-session.md.
```
```text
/vercel-obs Run preflight first, then execute a 5 minute live session against production using only serverless and edge-function logs. Summarize the top bug candidates in chat and save the full report under .hermes/observability/reports/incident-review.md.
```
## Guardrails
- Prefer read-only investigation unless the user explicitly asks for fixes
- Redact obvious secrets and tokens in reports
- Keep time windows narrow by default
- Use sampling for high-volume logs
- If drain creation fails, surface the Vercel API error and fall back to runtime-log analysis

View File

@@ -1,40 +0,0 @@
# Vercel Notes
These notes are here so the skill can stay short.
## Current Assumptions
- `vercel logs --json` is the structured runtime-log path for `v1`
- `vercel api` is used for drain CRUD operations
- The drain endpoints are under `/v1/drains`
- The receiver must be publicly reachable for Vercel to deliver drain traffic
- Drain signature verification uses HMAC-SHA1 over the raw request body and compares against `x-vercel-signature`
- For the one-shot `live-session` flow, tunnel setup is automated with `cloudflared` first and `ngrok` as fallback when available
## Practical Defaults
- Use runtime logs first for immediate signal
- Use drains only for live capture or longer windows
- Store normalized logs locally in SQLite for `v1`
- Use the repo root as the code-correlation root
## Useful CLI Commands
```bash
vercel whoami
vercel logs --json --since 30m
vercel api /v1/drains
vercel api /v1/drains -X POST --input payload.json
vercel api /v1/drains/{id} -X DELETE
```
## Suggested Sources
Reasonable first-pass source sets for a general web app:
- `serverless`
- `edge-function`
- `edge-middleware`
- `static`
Tune the sources down if the project is noisy.

View File

@@ -1,444 +0,0 @@
from __future__ import annotations
import importlib.util
import json
import sqlite3
import sys
from pathlib import Path
SCRIPT_PATH = (
Path(__file__).resolve().parents[2]
/ "optional-skills"
/ "observability"
/ "vercel-observability-loop"
/ "scripts"
/ "vercel_observability.py"
)
def load_module():
spec = importlib.util.spec_from_file_location("vercel_observability_skill", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_preflight_reads_vercel_linked_project(tmp_path: Path, monkeypatch):
mod = load_module()
project_dir = tmp_path / "app"
(project_dir / ".vercel").mkdir(parents=True)
(project_dir / ".vercel" / "project.json").write_text(
json.dumps(
{
"projectId": "prj_123",
"orgId": "team_456",
"projectName": "demo-app",
}
),
encoding="utf-8",
)
(project_dir / "vercel.json").write_text("{}", encoding="utf-8")
monkeypatch.setattr(mod.shutil, "which", lambda name: "/opt/homebrew/bin/vercel")
def fake_run(cmd, **kwargs):
joined = " ".join(cmd)
if joined == "vercel --version":
return mod.subprocess.CompletedProcess(cmd, 0, stdout="Vercel CLI 50.31.0\n", stderr="")
if joined == "vercel --help":
return mod.subprocess.CompletedProcess(cmd, 0, stdout="Commands:\n api\n", stderr="")
if joined == "vercel whoami --no-color --non-interactive":
return mod.subprocess.CompletedProcess(cmd, 0, stdout="rewbs\n", stderr="")
raise AssertionError(f"Unexpected command: {cmd}")
monkeypatch.setattr(mod, "run_command", fake_run)
result = mod.run_preflight(project_dir)
assert result["project"]["linked"] is True
assert result["project"]["project_id"] == "prj_123"
assert result["cli"]["logged_in"] is True
assert result["recommended_mode"] == "runtime-or-drain"
def test_collect_runtime_logs_persists_json_lines(tmp_path: Path, monkeypatch):
mod = load_module()
runtime_paths = mod.resolve_paths(tmp_path / ".hermes" / "observability")
sample_lines = "\n".join(
[
json.dumps(
{
"timestamp": "2026-03-16T01:02:03Z",
"level": "error",
"message": "Database timeout for /api/orders",
"path": "/api/orders",
"statusCode": 500,
"requestId": "req_123",
"source": "serverless",
}
),
json.dumps(
{
"timestamp": "2026-03-16T01:04:05Z",
"level": "info",
"message": "render home page",
"path": "/",
"statusCode": 200,
"requestId": "req_456",
"source": "edge-function",
}
),
]
)
def fake_run(cmd, **kwargs):
return mod.subprocess.CompletedProcess(cmd, 0, stdout=sample_lines, stderr="")
monkeypatch.setattr(mod, "run_command", fake_run)
result = mod.collect_runtime_logs(
cwd=tmp_path,
base_dir=runtime_paths["state_dir"],
since="30m",
until=None,
project=None,
environment=None,
level=None,
source=None,
limit=100,
search=None,
request_id=None,
status_code=None,
)
assert result["success"] is True
assert result["stored"] == 2
conn = sqlite3.connect(runtime_paths["db_path"])
try:
count = conn.execute("SELECT COUNT(*) FROM log_events").fetchone()[0]
finally:
conn.close()
assert count == 2
def test_verify_signature_uses_hmac_sha1():
mod = load_module()
body = b'{"message":"hello"}'
secret = "shared-secret"
signature = mod.hmac.new(secret.encode("utf-8"), body, "sha1").hexdigest()
assert mod.verify_signature(body, signature, secret) is True
assert mod.verify_signature(body, "bad-signature", secret) is False
def test_build_drain_payload_includes_self_served_source():
mod = load_module()
payload = mod.build_drain_payload(
name="hermes-observability",
target_url="https://example.trycloudflare.com",
project_id="prj_123",
sources=["serverless", "static"],
headers={"X-Test": "1"},
secret="secret",
delivery_format="json",
)
assert payload["name"] == "hermes-observability"
assert payload["projectIds"] == ["prj_123"]
assert payload["source"] == {"kind": "self-served"}
assert payload["sources"] == ["serverless", "static"]
assert payload["headers"]["X-Test"] == "1"
def test_normalize_log_record_handles_vercel_millisecond_timestamps_and_empty_messages():
mod = load_module()
record = mod.normalize_log_record(
{
"id": "4chxq-1773620260046-25aa70eb0443",
"timestamp": 1773620260046,
"deploymentId": "dpl_123",
"projectId": "prj_123",
"level": "info",
"message": "",
"source": "serverless",
"domain": "portal.nousresearch.com",
"requestMethod": "POST",
"requestPath": "/refresh",
"responseStatusCode": 0,
"environment": "production",
"traceId": "",
},
"runtime",
)
assert record["observed_at"] == "2026-03-16T00:17:40.046000Z"
assert record["path"] == "/refresh"
assert record["host"] == "portal.nousresearch.com"
assert record["status_code"] == 0
assert record["message"] == "POST /refresh -> 0 serverless"
assert record["request_id"] == "4chxq-1773620260046-25aa70eb0443"
def test_analyze_rows_flags_noisy_and_missing_context(tmp_path: Path):
mod = load_module()
repo_root = tmp_path / "repo"
(repo_root / "app" / "api").mkdir(parents=True)
(repo_root / "app" / "api" / "orders.ts").write_text("export function handler() {}", encoding="utf-8")
rows = []
for index in range(12):
rows.append(
{
"fingerprint": "noise",
"origin": "runtime",
"source": "edge-function",
"level": "info",
"status_code": 200,
"request_id": f"req_{index}",
"deployment_id": None,
"environment": "preview",
"path": "/",
"host": None,
"message": "Rendered landing page",
"raw_json": "{}",
}
)
rows.append(
{
"fingerprint": "bug",
"origin": "runtime",
"source": "serverless",
"level": "error",
"status_code": 500,
"request_id": None,
"deployment_id": None,
"environment": "production",
"path": "/api/orders",
"host": None,
"message": "Internal Server Error",
"raw_json": "{}",
}
)
analysis = mod.analyze_rows(rows, repo_root, sample_limit=3)
assert analysis["summary"]["bug_candidates"] >= 1
assert analysis["summary"]["noisy_log_candidates"] >= 1
assert analysis["summary"]["missing_context_candidates"] >= 1
assert any("orders.ts" in ",".join(item["likely_files"]) for item in analysis["bug_candidates"])
def test_live_session_runs_end_to_end_and_scopes_analysis(tmp_path: Path, monkeypatch):
mod = load_module()
runtime_paths = mod.resolve_paths(tmp_path / ".hermes" / "observability")
calls: dict[str, object] = {}
monkeypatch.setattr(
mod,
"run_preflight",
lambda cwd: {
"success": True,
"cli": {"installed": True, "logged_in": True, "api_supported": True},
"project": {"project_id": "prj_123"},
},
)
monkeypatch.setattr(
mod,
"start_receiver_background",
lambda **kwargs: {
"success": True,
"server": object(),
"thread": object(),
"startup": {
"listening": "http://127.0.0.1:4319",
"port": 4319,
"db_path": str(runtime_paths["db_path"]),
"raw_dir": str(runtime_paths["raw_dir"]),
},
},
)
monkeypatch.setattr(
mod,
"start_tunnel",
lambda **kwargs: {
"success": True,
"provider": "cloudflared",
"public_url": "https://demo.trycloudflare.com",
"command": ["cloudflared", "tunnel"],
"process": object(),
"reader_thread": object(),
},
)
monkeypatch.setattr(
mod,
"ensure_drain",
lambda **kwargs: {
"success": True,
"action": "create",
"response": {"json": {"id": "drn_123"}},
},
)
row_ids = iter([10, 16])
monkeypatch.setattr(mod, "get_max_row_id", lambda db_path: next(row_ids))
monkeypatch.setattr(mod.time, "sleep", lambda seconds: calls.setdefault("slept", seconds))
monkeypatch.setattr(
mod,
"delete_drain",
lambda **kwargs: {"success": True, "deleted": kwargs["drain_id"]},
)
monkeypatch.setattr(mod, "stop_tunnel", lambda process, reader_thread: {"success": True, "status": "stopped"})
monkeypatch.setattr(mod, "stop_receiver_background", lambda server, thread: {"success": True, "status": "stopped"})
def fake_analyze_database(**kwargs):
calls["analyze_kwargs"] = kwargs
return {
"success": True,
"report_path": str(kwargs["report_path"]),
"analysis": {
"summary": {
"records": 2,
"clusters": 1,
"bug_candidates": 1,
"noisy_log_candidates": 0,
"missing_context_candidates": 0,
}
},
}
monkeypatch.setattr(mod, "analyze_database", fake_analyze_database)
result = mod.run_live_session(
cwd=tmp_path,
base_dir=runtime_paths["state_dir"],
minutes=0.05,
bind="127.0.0.1",
port=4319,
secret="shared-secret",
name_prefix="session",
project_id=None,
scope=None,
sources=["serverless"],
headers=None,
delivery_format="json",
tunnel="auto",
tunnel_timeout=10.0,
environment="production",
limit=250,
sample_limit=15,
report_path=None,
)
assert result["success"] is True
assert calls["slept"] == 3.0
assert result["session"]["drain_id"] == "drn_123"
assert result["session"]["drain_name"].startswith("session-")
assert result["cleanup"]["drain"]["deleted"] == "drn_123"
analyze_kwargs = calls["analyze_kwargs"]
assert analyze_kwargs["origins"] == ["drain"]
assert analyze_kwargs["min_row_id"] == 10
assert analyze_kwargs["max_row_id"] == 16
assert analyze_kwargs["environment"] == "production"
assert analyze_kwargs["limit"] == 250
assert analyze_kwargs["sample_limit"] == 15
assert analyze_kwargs["report_path"].name.startswith("live-session-")
def test_live_session_cleans_up_receiver_and_tunnel_when_drain_creation_fails(tmp_path: Path, monkeypatch):
mod = load_module()
runtime_paths = mod.resolve_paths(tmp_path / ".hermes" / "observability")
calls = {"stop_tunnel": 0, "stop_receiver": 0, "analyze": 0}
monkeypatch.setattr(
mod,
"run_preflight",
lambda cwd: {
"success": True,
"cli": {"installed": True, "logged_in": True, "api_supported": True},
"project": {"project_id": "prj_123"},
},
)
monkeypatch.setattr(
mod,
"start_receiver_background",
lambda **kwargs: {
"success": True,
"server": object(),
"thread": object(),
"startup": {
"listening": "http://127.0.0.1:4319",
"port": 4319,
"db_path": str(runtime_paths["db_path"]),
"raw_dir": str(runtime_paths["raw_dir"]),
},
},
)
monkeypatch.setattr(
mod,
"start_tunnel",
lambda **kwargs: {
"success": True,
"provider": "cloudflared",
"public_url": "https://demo.trycloudflare.com",
"command": ["cloudflared", "tunnel"],
"process": object(),
"reader_thread": object(),
},
)
monkeypatch.setattr(
mod,
"ensure_drain",
lambda **kwargs: {"success": False, "phase": "create", "response": {"stderr": "boom"}},
)
monkeypatch.setattr(mod, "get_max_row_id", lambda db_path: 0)
monkeypatch.setattr(
mod,
"stop_tunnel",
lambda process, reader_thread: calls.__setitem__("stop_tunnel", calls["stop_tunnel"] + 1) or {"success": True},
)
monkeypatch.setattr(
mod,
"stop_receiver_background",
lambda server, thread: calls.__setitem__("stop_receiver", calls["stop_receiver"] + 1) or {"success": True},
)
monkeypatch.setattr(mod, "delete_drain", lambda **kwargs: (_ for _ in ()).throw(AssertionError("delete_drain should not be called")))
monkeypatch.setattr(
mod,
"analyze_database",
lambda **kwargs: calls.__setitem__("analyze", calls["analyze"] + 1) or {"success": True},
)
result = mod.run_live_session(
cwd=tmp_path,
base_dir=runtime_paths["state_dir"],
minutes=0.01,
bind="127.0.0.1",
port=4319,
secret="shared-secret",
name_prefix="session",
project_id=None,
scope=None,
sources=["serverless"],
headers=None,
delivery_format="json",
tunnel="auto",
tunnel_timeout=10.0,
environment=None,
limit=None,
sample_limit=20,
report_path=None,
)
assert result["success"] is False
assert result["phase"] == "ensure-drain"
assert calls["stop_tunnel"] == 1
assert calls["stop_receiver"] == 1
assert calls["analyze"] == 0

View File

@@ -68,22 +68,6 @@ class TestAtomicJsonWrite:
tmp_files = [f for f in tmp_path.iterdir() if ".tmp" in f.name]
assert len(tmp_files) == 0
def test_cleans_up_temp_file_on_baseexception(self, tmp_path):
class SimulatedAbort(BaseException):
pass
target = tmp_path / "data.json"
original = {"preserved": True}
target.write_text(json.dumps(original), encoding="utf-8")
with patch("utils.json.dump", side_effect=SimulatedAbort):
with pytest.raises(SimulatedAbort):
atomic_json_write(target, {"new": True})
tmp_files = [f for f in tmp_path.iterdir() if ".tmp" in f.name]
assert len(tmp_files) == 0
assert json.loads(target.read_text(encoding="utf-8")) == original
def test_accepts_string_path(self, tmp_path):
target = str(tmp_path / "string_path.json")
atomic_json_write(target, {"string": True})

View File

@@ -1,44 +0,0 @@
"""Tests for utils.atomic_yaml_write — crash-safe YAML file writes."""
from pathlib import Path
from unittest.mock import patch
import pytest
import yaml
from utils import atomic_yaml_write
class TestAtomicYamlWrite:
def test_writes_valid_yaml(self, tmp_path):
target = tmp_path / "data.yaml"
data = {"key": "value", "nested": {"a": 1}}
atomic_yaml_write(target, data)
assert yaml.safe_load(target.read_text(encoding="utf-8")) == data
def test_cleans_up_temp_file_on_baseexception(self, tmp_path):
class SimulatedAbort(BaseException):
pass
target = tmp_path / "data.yaml"
original = {"preserved": True}
target.write_text(yaml.safe_dump(original), encoding="utf-8")
with patch("utils.yaml.dump", side_effect=SimulatedAbort):
with pytest.raises(SimulatedAbort):
atomic_yaml_write(target, {"new": True})
tmp_files = [f for f in tmp_path.iterdir() if ".tmp" in f.name]
assert len(tmp_files) == 0
assert yaml.safe_load(target.read_text(encoding="utf-8")) == original
def test_appends_extra_content(self, tmp_path):
target = tmp_path / "data.yaml"
atomic_yaml_write(target, {"key": "value"}, extra_content="\n# comment\n")
text = target.read_text(encoding="utf-8")
assert "key: value" in text
assert "# comment" in text

View File

@@ -1,10 +1,8 @@
"""Tests for tools/checkpoint_manager.py — CheckpointManager."""
import logging
import os
import json
import shutil
import subprocess
import pytest
from pathlib import Path
from unittest.mock import patch
@@ -145,12 +143,6 @@ class TestTakeCheckpoint:
result = mgr.ensure_checkpoint(str(work_dir), "initial")
assert result is True
def test_successful_checkpoint_does_not_log_expected_diff_exit(self, mgr, work_dir, caplog):
with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
result = mgr.ensure_checkpoint(str(work_dir), "initial")
assert result is True
assert not any("diff --cached --quiet" in r.getMessage() for r in caplog.records)
def test_dedup_same_turn(self, mgr, work_dir):
r1 = mgr.ensure_checkpoint(str(work_dir), "first")
r2 = mgr.ensure_checkpoint(str(work_dir), "second")
@@ -383,26 +375,6 @@ class TestErrorResilience:
result = mgr.ensure_checkpoint(str(work_dir), "test")
assert result is False
def test_run_git_allows_expected_nonzero_without_error_log(self, tmp_path, caplog):
completed = subprocess.CompletedProcess(
args=["git", "diff", "--cached", "--quiet"],
returncode=1,
stdout="",
stderr="",
)
with patch("tools.checkpoint_manager.subprocess.run", return_value=completed):
with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
ok, stdout, stderr = _run_git(
["diff", "--cached", "--quiet"],
tmp_path / "shadow",
str(tmp_path / "work"),
allowed_returncodes={1},
)
assert ok is False
assert stdout == ""
assert stderr == ""
assert not caplog.records
def test_checkpoint_failure_does_not_raise(self, mgr, work_dir, monkeypatch):
"""Checkpoint failures should never raise — they're silently logged."""
def broken_run_git(*args, **kwargs):

View File

@@ -1,4 +1,5 @@
import logging
from io import StringIO
import subprocess
import pytest
@@ -86,3 +87,64 @@ def test_ensure_docker_available_uses_resolved_executable(monkeypatch):
})
]
class _FakePopen:
def __init__(self, cmd, **kwargs):
self.cmd = cmd
self.kwargs = kwargs
self.stdout = StringIO("")
self.stdin = None
self.returncode = 0
def poll(self):
return self.returncode
def _make_execute_only_env(forward_env=None):
env = docker_env.DockerEnvironment.__new__(docker_env.DockerEnvironment)
env.cwd = "/root"
env.timeout = 60
env._forward_env = forward_env or []
env._prepare_command = lambda command: (command, None)
env._timeout_result = lambda timeout: {"output": f"timed out after {timeout}", "returncode": 124}
env._inner = type("Inner", (), {
"container_id": "test-container",
"config": type("Cfg", (), {"executable": "/usr/bin/docker", "env": {}})(),
})()
return env
def test_execute_uses_hermes_dotenv_for_allowlisted_env(monkeypatch):
env = _make_execute_only_env(["GITHUB_TOKEN"])
popen_calls = []
def _fake_popen(cmd, **kwargs):
popen_calls.append(cmd)
return _FakePopen(cmd, **kwargs)
monkeypatch.delenv("GITHUB_TOKEN", raising=False)
monkeypatch.setattr(docker_env, "_load_hermes_env_vars", lambda: {"GITHUB_TOKEN": "value_from_dotenv"})
monkeypatch.setattr(docker_env.subprocess, "Popen", _fake_popen)
result = env.execute("echo hi")
assert result["returncode"] == 0
assert "GITHUB_TOKEN=value_from_dotenv" in popen_calls[0]
def test_execute_prefers_shell_env_over_hermes_dotenv(monkeypatch):
env = _make_execute_only_env(["GITHUB_TOKEN"])
popen_calls = []
def _fake_popen(cmd, **kwargs):
popen_calls.append(cmd)
return _FakePopen(cmd, **kwargs)
monkeypatch.setenv("GITHUB_TOKEN", "value_from_shell")
monkeypatch.setattr(docker_env, "_load_hermes_env_vars", lambda: {"GITHUB_TOKEN": "value_from_dotenv"})
monkeypatch.setattr(docker_env.subprocess, "Popen", _fake_popen)
env.execute("echo hi")
assert "GITHUB_TOKEN=value_from_shell" in popen_calls[0]
assert "GITHUB_TOKEN=value_from_dotenv" not in popen_calls[0]

View File

@@ -5,7 +5,6 @@ handling without requiring a running terminal environment.
"""
import json
import logging
from unittest.mock import MagicMock, patch
from tools.file_tools import (
@@ -88,26 +87,13 @@ class TestWriteFileHandler:
mock_ops.write_file.assert_called_once_with("/tmp/out.txt", "hello world!\n")
@patch("tools.file_tools._get_file_ops")
def test_permission_error_returns_error_json_without_error_log(self, mock_get, caplog):
def test_exception_returns_error_json(self, mock_get):
mock_get.side_effect = PermissionError("read-only filesystem")
from tools.file_tools import write_file_tool
with caplog.at_level(logging.DEBUG, logger="tools.file_tools"):
result = json.loads(write_file_tool("/tmp/out.txt", "data"))
result = json.loads(write_file_tool("/tmp/out.txt", "data"))
assert "error" in result
assert "read-only" in result["error"]
assert any("write_file expected denial" in r.getMessage() for r in caplog.records)
assert not any(r.levelno >= logging.ERROR for r in caplog.records)
@patch("tools.file_tools._get_file_ops")
def test_unexpected_exception_still_logs_error(self, mock_get, caplog):
mock_get.side_effect = RuntimeError("boom")
from tools.file_tools import write_file_tool
with caplog.at_level(logging.ERROR, logger="tools.file_tools"):
result = json.loads(write_file_tool("/tmp/out.txt", "data"))
assert result["error"] == "boom"
assert any("write_file error" in r.getMessage() for r in caplog.records)
class TestPatchHandler:

View File

@@ -30,6 +30,28 @@ class TestParseEnvVar:
result = _parse_env_var("TERMINAL_DOCKER_VOLUMES", "[]", json.loads, "valid JSON")
assert result == ["/host:/container"]
def test_get_env_config_parses_docker_forward_env_json(self):
with patch.dict("os.environ", {
"TERMINAL_ENV": "docker",
"TERMINAL_DOCKER_FORWARD_ENV": '["GITHUB_TOKEN", "NPM_TOKEN"]',
}, clear=False):
config = _tt_mod._get_env_config()
assert config["docker_forward_env"] == ["GITHUB_TOKEN", "NPM_TOKEN"]
def test_create_environment_passes_docker_forward_env(self):
fake_env = object()
with patch.object(_tt_mod, "_DockerEnvironment", return_value=fake_env) as mock_docker:
result = _tt_mod._create_environment(
"docker",
image="python:3.11",
cwd="/root",
timeout=180,
container_config={"docker_forward_env": ["GITHUB_TOKEN"]},
)
assert result is fake_env
assert mock_docker.call_args.kwargs["forward_env"] == ["GITHUB_TOKEN"]
def test_falls_back_to_default(self):
with patch.dict("os.environ", {}, clear=False):
# Remove the var if it exists, rely on default

View File

@@ -92,17 +92,10 @@ def _run_git(
shadow_repo: Path,
working_dir: str,
timeout: int = _GIT_TIMEOUT,
allowed_returncodes: Optional[Set[int]] = None,
) -> tuple:
"""Run a git command against the shadow repo. Returns (ok, stdout, stderr).
``allowed_returncodes`` suppresses error logging for known/expected non-zero
exits while preserving the normal ``ok = (returncode == 0)`` contract.
Example: ``git diff --cached --quiet`` returns 1 when changes exist.
"""
"""Run a git command against the shadow repo. Returns (ok, stdout, stderr)."""
env = _git_env(shadow_repo, working_dir)
cmd = ["git"] + list(args)
allowed_returncodes = allowed_returncodes or set()
try:
result = subprocess.run(
cmd,
@@ -115,7 +108,7 @@ def _run_git(
ok = result.returncode == 0
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if not ok and result.returncode not in allowed_returncodes:
if not ok:
logger.error(
"Git command failed: %s (rc=%d) stderr=%s",
" ".join(cmd), result.returncode, stderr,
@@ -388,10 +381,7 @@ class CheckpointManager:
# Check if there's anything to commit
ok_diff, diff_out, _ = _run_git(
["diff", "--cached", "--quiet"],
shadow,
working_dir,
allowed_returncodes={1},
["diff", "--cached", "--quiet"], shadow, working_dir,
)
if ok_diff:
# No changes to commit

View File

@@ -7,6 +7,7 @@ persistence via bind mounts.
import logging
import os
import re
import shutil
import subprocess
import sys
@@ -30,6 +31,42 @@ _DOCKER_SEARCH_PATHS = [
]
_docker_executable: Optional[str] = None # resolved once, cached
_ENV_VAR_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
def _normalize_forward_env_names(forward_env: list[str] | None) -> list[str]:
"""Return a deduplicated list of valid environment variable names."""
normalized: list[str] = []
seen: set[str] = set()
for item in forward_env or []:
if not isinstance(item, str):
logger.warning("Ignoring non-string docker_forward_env entry: %r", item)
continue
key = item.strip()
if not key:
continue
if not _ENV_VAR_NAME_RE.match(key):
logger.warning("Ignoring invalid docker_forward_env entry: %r", item)
continue
if key in seen:
continue
seen.add(key)
normalized.append(key)
return normalized
def _load_hermes_env_vars() -> dict[str, str]:
"""Load ~/.hermes/.env values without failing Docker command execution."""
try:
from hermes_cli.config import load_env
return load_env() or {}
except Exception:
return {}
def find_docker() -> Optional[str]:
@@ -171,6 +208,7 @@ class DockerEnvironment(BaseEnvironment):
persistent_filesystem: bool = False,
task_id: str = "default",
volumes: list = None,
forward_env: list[str] | None = None,
network: bool = True,
):
if cwd == "~":
@@ -179,6 +217,7 @@ class DockerEnvironment(BaseEnvironment):
self._base_image = image
self._persistent = persistent_filesystem
self._task_id = task_id
self._forward_env = _normalize_forward_env_names(forward_env)
self._container_id: Optional[str] = None
logger.info(f"DockerEnvironment volumes: {volumes}")
# Ensure volumes is a list (config.yaml could be malformed)
@@ -330,8 +369,12 @@ class DockerEnvironment(BaseEnvironment):
if effective_stdin is not None:
cmd.append("-i")
cmd.extend(["-w", work_dir])
for key in self._inner.config.forward_env:
if (value := os.getenv(key)) is not None:
hermes_env = _load_hermes_env_vars() if self._forward_env else {}
for key in self._forward_env:
value = os.getenv(key)
if value is None:
value = hermes_env.get(key)
if value is not None:
cmd.extend(["-e", f"{key}={value}"])
for key, value in self._inner.config.env.items():
cmd.extend(["-e", f"{key}={value}"])

View File

@@ -1,7 +1,6 @@
#!/usr/bin/env python3
"""File Tools Module - LLM agent file manipulation tools."""
import errno
import json
import logging
import os
@@ -12,18 +11,6 @@ from agent.redact import redact_sensitive_text
logger = logging.getLogger(__name__)
_EXPECTED_WRITE_ERRNOS = {errno.EACCES, errno.EPERM, errno.EROFS}
def _is_expected_write_exception(exc: Exception) -> bool:
"""Return True for expected write denials that should not hit error logs."""
if isinstance(exc, PermissionError):
return True
if isinstance(exc, OSError) and exc.errno in _EXPECTED_WRITE_ERRNOS:
return True
return False
_file_ops_lock = threading.Lock()
_file_ops_cache: dict = {}
@@ -251,10 +238,7 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
result = file_ops.write_file(path, content)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
if _is_expected_write_exception(e):
logger.debug("write_file expected denial: %s: %s", type(e).__name__, e)
else:
logger.error("write_file error: %s: %s", type(e).__name__, e, exc_info=True)
logger.error("write_file error: %s: %s", type(e).__name__, e)
return json.dumps({"error": str(e)}, ensure_ascii=False)

View File

@@ -492,6 +492,7 @@ def _get_env_config() -> Dict[str, Any]:
return {
"env_type": env_type,
"docker_image": os.getenv("TERMINAL_DOCKER_IMAGE", default_image),
"docker_forward_env": _parse_env_var("TERMINAL_DOCKER_FORWARD_ENV", "[]", json.loads, "valid JSON"),
"singularity_image": os.getenv("TERMINAL_SINGULARITY_IMAGE", f"docker://{default_image}"),
"modal_image": os.getenv("TERMINAL_MODAL_IMAGE", default_image),
"daytona_image": os.getenv("TERMINAL_DAYTONA_IMAGE", default_image),
@@ -536,6 +537,7 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int,
disk = cc.get("container_disk", 51200)
persistent = cc.get("container_persistent", True)
volumes = cc.get("docker_volumes", [])
docker_forward_env = cc.get("docker_forward_env", [])
if env_type == "local":
return _LocalEnvironment(cwd=cwd, timeout=timeout)
@@ -546,6 +548,7 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int,
cpu=cpu, memory=memory, disk=disk,
persistent_filesystem=persistent, task_id=task_id,
volumes=volumes,
forward_env=docker_forward_env,
)
elif env_type == "singularity":

View File

@@ -50,8 +50,6 @@ def atomic_json_write(
os.fsync(f.fileno())
os.replace(tmp_path, path)
except BaseException:
# Intentionally catch BaseException so temp-file cleanup still runs for
# KeyboardInterrupt/SystemExit before re-raising the original signal.
try:
os.unlink(tmp_path)
except OSError:
@@ -98,8 +96,6 @@ def atomic_yaml_write(
os.fsync(f.fileno())
os.replace(tmp_path, path)
except BaseException:
# Match atomic_json_write: cleanup must also happen for process-level
# interruptions before we re-raise them.
try:
os.unlink(tmp_path)
except OSError:

View File

@@ -76,6 +76,7 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
|----------|-------------|
| `TERMINAL_ENV` | Backend: `local`, `docker`, `ssh`, `singularity`, `modal`, `daytona` |
| `TERMINAL_DOCKER_IMAGE` | Docker image (default: `python:3.11`) |
| `TERMINAL_DOCKER_FORWARD_ENV` | JSON array of env var names to explicitly forward into Docker terminal sessions |
| `TERMINAL_DOCKER_VOLUMES` | Additional Docker volume mounts (comma-separated `host:container` pairs) |
| `TERMINAL_SINGULARITY_IMAGE` | Singularity image or `.sif` path |
| `TERMINAL_MODAL_IMAGE` | Modal container image |

View File

@@ -453,6 +453,8 @@ terminal:
# Docker-specific settings
docker_image: "nikolaik/python-nodejs:python3.11-nodejs20"
docker_forward_env: # Optional explicit allowlist for env passthrough
- "GITHUB_TOKEN"
docker_volumes: # Share host directories with the container
- "/home/user/projects:/workspace/projects"
- "/home/user/data:/data:ro" # :ro for read-only
@@ -517,6 +519,24 @@ This is useful for:
Can also be set via environment variable: `TERMINAL_DOCKER_VOLUMES='["/host:/container"]'` (JSON array).
### Docker Credential Forwarding
By default, Docker terminal sessions do not inherit arbitrary host credentials. If you need a specific token inside the container, add it to `terminal.docker_forward_env`.
```yaml
terminal:
backend: docker
docker_forward_env:
- "GITHUB_TOKEN"
- "NPM_TOKEN"
```
Hermes resolves each listed variable from your current shell first, then falls back to `~/.hermes/.env` if it was saved with `hermes config set`.
:::warning
Anything listed in `docker_forward_env` becomes visible to commands run inside the container. Only forward credentials you are comfortable exposing to the terminal session.
:::
See [Code Execution](features/code-execution.md) and the [Terminal section of the README](features/tools.md) for details on each backend.
## Memory Configuration

View File

@@ -135,6 +135,8 @@ All container backends run with security hardening:
- Full namespace isolation
- Persistent workspace via volumes, not writable root layer
Docker can optionally receive an explicit env allowlist via `terminal.docker_forward_env`, but forwarded variables are visible to commands inside the container and should be treated as exposed to that session.
## Background Process Management
Start background processes and manage them:

View File

@@ -212,6 +212,7 @@ Container resources are configurable in `~/.hermes/config.yaml`:
terminal:
backend: docker
docker_image: "nikolaik/python-nodejs:python3.11-nodejs20"
docker_forward_env: [] # Explicit allowlist only; empty keeps secrets out of the container
container_cpu: 1 # CPU cores
container_memory: 5120 # MB (default 5GB)
container_disk: 51200 # MB (default 50GB, requires overlay2 on XFS)
@@ -227,6 +228,10 @@ terminal:
For production gateway deployments, use `docker`, `modal`, or `daytona` backend to isolate agent commands from your host system. This eliminates the need for dangerous command approval entirely.
:::
:::warning
If you add names to `terminal.docker_forward_env`, those variables are intentionally injected into the container for terminal commands. This is useful for task-specific credentials like `GITHUB_TOKEN`, but it also means code running in the container can read and exfiltrate them.
:::
## Terminal Backend Security Comparison
| Backend | Isolation | Dangerous Cmd Check | Best For |