Compare commits

...

5 Commits

Author SHA1 Message Date
Robin Fernandes
dd808a7c43 First pass at a vercel log analysis skill 2026-03-16 12:10:25 +11:00
Teknium
64d333204b Merge pull request #1242 from NousResearch/fix/file-tool-log-noise
fix: reduce file tool log noise
2026-03-15 11:11:18 -07:00
Teknium
c44af43840 Merge pull request #1401 from NousResearch/hermes/hermes-eca4a640
test: protect atomic temp cleanup on interrupts
2026-03-15 11:10:41 -07:00
teknium1
b117bbc125 test: cover atomic temp cleanup on interrupts
- add regression coverage for BaseException cleanup in atomic_json_write
- add dedicated atomic_yaml_write tests, including interrupt cleanup
- document why BaseException is intentional in both helpers
2026-03-14 22:31:51 -07:00
teknium1
b59da08730 fix: reduce file tool log noise
- treat git diff --cached --quiet rc=1 as an expected checkpoint state
  instead of logging it as an error
- downgrade expected write PermissionError/EROFS/EACCES failures out of
  error logging while keeping unexpected exceptions at error level
- add regression tests for both logging behaviors
2026-03-13 22:14:00 -07:00
11 changed files with 2599 additions and 6 deletions

View File

@@ -0,0 +1,193 @@
---
name: vercel-obs
description: Investigate Vercel-deployed apps by collecting runtime logs or configuring a drain to a local receiver, correlating the data with the current codebase, and producing bug-focused observability reports.
version: 1.0.0
author: Hermes Agent
license: MIT
metadata:
hermes:
tags: [vercel, observability, logging, debugging, production]
related_skills: [native-mcp]
---
# Vercel Obs
Use this skill when the current app is deployed to Vercel and the user wants a code-aware observability pass over recent runtime logs or a temporary drain-backed capture session.
## Prerequisites
- `vercel` CLI installed and logged in
- Repo linked to a Vercel project, or the user can provide a project id/name
- For one-shot live drain capture: `cloudflared` or `ngrok` installed locally
- Drain support is plan-dependent; if drains are unavailable, fall back to runtime-log analysis
## Helper Script
Installed path:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py
```
Read `references/vercel.md` if you need the current Vercel constraints or API assumptions.
## Workflow
### 1. Preflight
Always start with:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py preflight
```
This checks for:
- linked Vercel project metadata
- CLI availability and version
- current Vercel login state
- whether `vercel api` is available for drain operations
If the repo is not linked or the CLI is not authenticated, stop and explain the blocker.
### 2. Immediate Runtime Analysis
Use runtime logs first so the user gets signal immediately:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py collect-runtime --since 30m
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py analyze --since 30m --report-path .hermes/observability/reports/runtime-report.md
```
This path is the default fallback when drain setup is not possible.
### 3. One-Shot Live Session
For a single prompt workflow, prefer the built-in orchestration command:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py live-session \
--minutes 10 \
--environment production \
--report-path .hermes/observability/reports/live-session.md
```
This command will:
- start the local drain receiver
- launch a tunnel with `cloudflared` or `ngrok`
- create a temporary Vercel drain against the linked project
- collect logs for the requested window
- delete the drain
- stop the tunnel and receiver
- analyze only the rows captured during that session
- write a report
Useful flags:
- `--project-id prj_123` if the repo is not linked
- `--scope team_slug` for team-scoped Vercel access
- `--source serverless --source edge-function` to narrow the capture
- `--tunnel cloudflared` to force a provider
- `--name-prefix hermes-incident` to change the temporary drain name prefix
If the tunnel binary is missing or the drain cannot be created, the script should still clean up the local receiver before exiting with an error.
### 4. Local Receiver for Manual Live Drain Capture
Use the manual steps below only when you need fine-grained control.
Start the receiver in the background:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py serve --port 4319 --secret YOUR_SHARED_SECRET
```
Run it through Hermes background process support so it stays alive.
The receiver writes to:
- `.hermes/observability/logs.sqlite3`
- `.hermes/observability/raw/`
### 5. Expose the Receiver
If the receiver is only listening on localhost, expose it with a tunnel before creating the drain.
Preferred manual pattern:
```bash
cloudflared tunnel --url http://127.0.0.1:4319 --no-autoupdate
```
Parse the public HTTPS URL from the tunnel output. If no tunnel is available, explain that Vercel cannot deliver drains to a private localhost endpoint.
### 6. Create or Reuse the Drain
Once you have a public URL:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py ensure-drain \
--name hermes-observability \
--target-url https://example.trycloudflare.com \
--project-id prj_123 \
--secret YOUR_SHARED_SECRET \
--source static \
--source serverless \
--source edge-function
```
If the project id is omitted, the script tries `.vercel/project.json`.
For teardown:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py delete-drain --drain-id d_123
```
### 7. Analyze and Report
Generate a report after enough logs have arrived:
```bash
python ~/.hermes/skills/observability/vercel-observability-loop/scripts/vercel_observability.py analyze \
--since 2h \
--sample-limit 20 \
--report-path .hermes/observability/reports/observability-report.md
```
The report should prioritize:
- bug candidates
- noisy or superfluous logs
- missing context in error logs
- concrete fix proposals tied back to likely files in the repo
## Output Expectations
When using this skill, produce:
1. A short status summary of what mode you used: runtime only or drain-backed
2. The report path
3. The highest-signal findings first
4. Concrete next steps, including drain cleanup if you created one
## Hermes Prompt Patterns
Use prompts like:
```text
/vercel-obs Run a 10 minute live observability session for this repo: start live collection, set up the tunnel, create a temporary drain, collect logs, clean everything up, analyze the captured data, and write the report to .hermes/observability/reports/live-session.md.
```
```text
/vercel-obs Run preflight first, then execute a 5 minute live session against production using only serverless and edge-function logs. Summarize the top bug candidates in chat and save the full report under .hermes/observability/reports/incident-review.md.
```
## Guardrails
- Prefer read-only investigation unless the user explicitly asks for fixes
- Redact obvious secrets and tokens in reports
- Keep time windows narrow by default
- Use sampling for high-volume logs
- If drain creation fails, surface the Vercel API error and fall back to runtime-log analysis

View File

@@ -0,0 +1,40 @@
# Vercel Notes
These notes are here so the skill can stay short.
## Current Assumptions
- `vercel logs --json` is the structured runtime-log path for `v1`
- `vercel api` is used for drain CRUD operations
- The drain endpoints are under `/v1/drains`
- The receiver must be publicly reachable for Vercel to deliver drain traffic
- Drain signature verification uses HMAC-SHA1 over the raw request body and compares against `x-vercel-signature`
- For the one-shot `live-session` flow, tunnel setup is automated with `cloudflared` first and `ngrok` as fallback when available
## Practical Defaults
- Use runtime logs first for immediate signal
- Use drains only for live capture or longer windows
- Store normalized logs locally in SQLite for `v1`
- Use the repo root as the code-correlation root
## Useful CLI Commands
```bash
vercel whoami
vercel logs --json --since 30m
vercel api /v1/drains
vercel api /v1/drains -X POST --input payload.json
vercel api /v1/drains/{id} -X DELETE
```
## Suggested Sources
Reasonable first-pass source sets for a general web app:
- `serverless`
- `edge-function`
- `edge-middleware`
- `static`
Tune the sources down if the project is noisy.

View File

@@ -0,0 +1,444 @@
from __future__ import annotations
import importlib.util
import json
import sqlite3
import sys
from pathlib import Path
SCRIPT_PATH = (
Path(__file__).resolve().parents[2]
/ "optional-skills"
/ "observability"
/ "vercel-observability-loop"
/ "scripts"
/ "vercel_observability.py"
)
def load_module():
spec = importlib.util.spec_from_file_location("vercel_observability_skill", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_preflight_reads_vercel_linked_project(tmp_path: Path, monkeypatch):
mod = load_module()
project_dir = tmp_path / "app"
(project_dir / ".vercel").mkdir(parents=True)
(project_dir / ".vercel" / "project.json").write_text(
json.dumps(
{
"projectId": "prj_123",
"orgId": "team_456",
"projectName": "demo-app",
}
),
encoding="utf-8",
)
(project_dir / "vercel.json").write_text("{}", encoding="utf-8")
monkeypatch.setattr(mod.shutil, "which", lambda name: "/opt/homebrew/bin/vercel")
def fake_run(cmd, **kwargs):
joined = " ".join(cmd)
if joined == "vercel --version":
return mod.subprocess.CompletedProcess(cmd, 0, stdout="Vercel CLI 50.31.0\n", stderr="")
if joined == "vercel --help":
return mod.subprocess.CompletedProcess(cmd, 0, stdout="Commands:\n api\n", stderr="")
if joined == "vercel whoami --no-color --non-interactive":
return mod.subprocess.CompletedProcess(cmd, 0, stdout="rewbs\n", stderr="")
raise AssertionError(f"Unexpected command: {cmd}")
monkeypatch.setattr(mod, "run_command", fake_run)
result = mod.run_preflight(project_dir)
assert result["project"]["linked"] is True
assert result["project"]["project_id"] == "prj_123"
assert result["cli"]["logged_in"] is True
assert result["recommended_mode"] == "runtime-or-drain"
def test_collect_runtime_logs_persists_json_lines(tmp_path: Path, monkeypatch):
mod = load_module()
runtime_paths = mod.resolve_paths(tmp_path / ".hermes" / "observability")
sample_lines = "\n".join(
[
json.dumps(
{
"timestamp": "2026-03-16T01:02:03Z",
"level": "error",
"message": "Database timeout for /api/orders",
"path": "/api/orders",
"statusCode": 500,
"requestId": "req_123",
"source": "serverless",
}
),
json.dumps(
{
"timestamp": "2026-03-16T01:04:05Z",
"level": "info",
"message": "render home page",
"path": "/",
"statusCode": 200,
"requestId": "req_456",
"source": "edge-function",
}
),
]
)
def fake_run(cmd, **kwargs):
return mod.subprocess.CompletedProcess(cmd, 0, stdout=sample_lines, stderr="")
monkeypatch.setattr(mod, "run_command", fake_run)
result = mod.collect_runtime_logs(
cwd=tmp_path,
base_dir=runtime_paths["state_dir"],
since="30m",
until=None,
project=None,
environment=None,
level=None,
source=None,
limit=100,
search=None,
request_id=None,
status_code=None,
)
assert result["success"] is True
assert result["stored"] == 2
conn = sqlite3.connect(runtime_paths["db_path"])
try:
count = conn.execute("SELECT COUNT(*) FROM log_events").fetchone()[0]
finally:
conn.close()
assert count == 2
def test_verify_signature_uses_hmac_sha1():
mod = load_module()
body = b'{"message":"hello"}'
secret = "shared-secret"
signature = mod.hmac.new(secret.encode("utf-8"), body, "sha1").hexdigest()
assert mod.verify_signature(body, signature, secret) is True
assert mod.verify_signature(body, "bad-signature", secret) is False
def test_build_drain_payload_includes_self_served_source():
mod = load_module()
payload = mod.build_drain_payload(
name="hermes-observability",
target_url="https://example.trycloudflare.com",
project_id="prj_123",
sources=["serverless", "static"],
headers={"X-Test": "1"},
secret="secret",
delivery_format="json",
)
assert payload["name"] == "hermes-observability"
assert payload["projectIds"] == ["prj_123"]
assert payload["source"] == {"kind": "self-served"}
assert payload["sources"] == ["serverless", "static"]
assert payload["headers"]["X-Test"] == "1"
def test_normalize_log_record_handles_vercel_millisecond_timestamps_and_empty_messages():
mod = load_module()
record = mod.normalize_log_record(
{
"id": "4chxq-1773620260046-25aa70eb0443",
"timestamp": 1773620260046,
"deploymentId": "dpl_123",
"projectId": "prj_123",
"level": "info",
"message": "",
"source": "serverless",
"domain": "portal.nousresearch.com",
"requestMethod": "POST",
"requestPath": "/refresh",
"responseStatusCode": 0,
"environment": "production",
"traceId": "",
},
"runtime",
)
assert record["observed_at"] == "2026-03-16T00:17:40.046000Z"
assert record["path"] == "/refresh"
assert record["host"] == "portal.nousresearch.com"
assert record["status_code"] == 0
assert record["message"] == "POST /refresh -> 0 serverless"
assert record["request_id"] == "4chxq-1773620260046-25aa70eb0443"
def test_analyze_rows_flags_noisy_and_missing_context(tmp_path: Path):
mod = load_module()
repo_root = tmp_path / "repo"
(repo_root / "app" / "api").mkdir(parents=True)
(repo_root / "app" / "api" / "orders.ts").write_text("export function handler() {}", encoding="utf-8")
rows = []
for index in range(12):
rows.append(
{
"fingerprint": "noise",
"origin": "runtime",
"source": "edge-function",
"level": "info",
"status_code": 200,
"request_id": f"req_{index}",
"deployment_id": None,
"environment": "preview",
"path": "/",
"host": None,
"message": "Rendered landing page",
"raw_json": "{}",
}
)
rows.append(
{
"fingerprint": "bug",
"origin": "runtime",
"source": "serverless",
"level": "error",
"status_code": 500,
"request_id": None,
"deployment_id": None,
"environment": "production",
"path": "/api/orders",
"host": None,
"message": "Internal Server Error",
"raw_json": "{}",
}
)
analysis = mod.analyze_rows(rows, repo_root, sample_limit=3)
assert analysis["summary"]["bug_candidates"] >= 1
assert analysis["summary"]["noisy_log_candidates"] >= 1
assert analysis["summary"]["missing_context_candidates"] >= 1
assert any("orders.ts" in ",".join(item["likely_files"]) for item in analysis["bug_candidates"])
def test_live_session_runs_end_to_end_and_scopes_analysis(tmp_path: Path, monkeypatch):
mod = load_module()
runtime_paths = mod.resolve_paths(tmp_path / ".hermes" / "observability")
calls: dict[str, object] = {}
monkeypatch.setattr(
mod,
"run_preflight",
lambda cwd: {
"success": True,
"cli": {"installed": True, "logged_in": True, "api_supported": True},
"project": {"project_id": "prj_123"},
},
)
monkeypatch.setattr(
mod,
"start_receiver_background",
lambda **kwargs: {
"success": True,
"server": object(),
"thread": object(),
"startup": {
"listening": "http://127.0.0.1:4319",
"port": 4319,
"db_path": str(runtime_paths["db_path"]),
"raw_dir": str(runtime_paths["raw_dir"]),
},
},
)
monkeypatch.setattr(
mod,
"start_tunnel",
lambda **kwargs: {
"success": True,
"provider": "cloudflared",
"public_url": "https://demo.trycloudflare.com",
"command": ["cloudflared", "tunnel"],
"process": object(),
"reader_thread": object(),
},
)
monkeypatch.setattr(
mod,
"ensure_drain",
lambda **kwargs: {
"success": True,
"action": "create",
"response": {"json": {"id": "drn_123"}},
},
)
row_ids = iter([10, 16])
monkeypatch.setattr(mod, "get_max_row_id", lambda db_path: next(row_ids))
monkeypatch.setattr(mod.time, "sleep", lambda seconds: calls.setdefault("slept", seconds))
monkeypatch.setattr(
mod,
"delete_drain",
lambda **kwargs: {"success": True, "deleted": kwargs["drain_id"]},
)
monkeypatch.setattr(mod, "stop_tunnel", lambda process, reader_thread: {"success": True, "status": "stopped"})
monkeypatch.setattr(mod, "stop_receiver_background", lambda server, thread: {"success": True, "status": "stopped"})
def fake_analyze_database(**kwargs):
calls["analyze_kwargs"] = kwargs
return {
"success": True,
"report_path": str(kwargs["report_path"]),
"analysis": {
"summary": {
"records": 2,
"clusters": 1,
"bug_candidates": 1,
"noisy_log_candidates": 0,
"missing_context_candidates": 0,
}
},
}
monkeypatch.setattr(mod, "analyze_database", fake_analyze_database)
result = mod.run_live_session(
cwd=tmp_path,
base_dir=runtime_paths["state_dir"],
minutes=0.05,
bind="127.0.0.1",
port=4319,
secret="shared-secret",
name_prefix="session",
project_id=None,
scope=None,
sources=["serverless"],
headers=None,
delivery_format="json",
tunnel="auto",
tunnel_timeout=10.0,
environment="production",
limit=250,
sample_limit=15,
report_path=None,
)
assert result["success"] is True
assert calls["slept"] == 3.0
assert result["session"]["drain_id"] == "drn_123"
assert result["session"]["drain_name"].startswith("session-")
assert result["cleanup"]["drain"]["deleted"] == "drn_123"
analyze_kwargs = calls["analyze_kwargs"]
assert analyze_kwargs["origins"] == ["drain"]
assert analyze_kwargs["min_row_id"] == 10
assert analyze_kwargs["max_row_id"] == 16
assert analyze_kwargs["environment"] == "production"
assert analyze_kwargs["limit"] == 250
assert analyze_kwargs["sample_limit"] == 15
assert analyze_kwargs["report_path"].name.startswith("live-session-")
def test_live_session_cleans_up_receiver_and_tunnel_when_drain_creation_fails(tmp_path: Path, monkeypatch):
mod = load_module()
runtime_paths = mod.resolve_paths(tmp_path / ".hermes" / "observability")
calls = {"stop_tunnel": 0, "stop_receiver": 0, "analyze": 0}
monkeypatch.setattr(
mod,
"run_preflight",
lambda cwd: {
"success": True,
"cli": {"installed": True, "logged_in": True, "api_supported": True},
"project": {"project_id": "prj_123"},
},
)
monkeypatch.setattr(
mod,
"start_receiver_background",
lambda **kwargs: {
"success": True,
"server": object(),
"thread": object(),
"startup": {
"listening": "http://127.0.0.1:4319",
"port": 4319,
"db_path": str(runtime_paths["db_path"]),
"raw_dir": str(runtime_paths["raw_dir"]),
},
},
)
monkeypatch.setattr(
mod,
"start_tunnel",
lambda **kwargs: {
"success": True,
"provider": "cloudflared",
"public_url": "https://demo.trycloudflare.com",
"command": ["cloudflared", "tunnel"],
"process": object(),
"reader_thread": object(),
},
)
monkeypatch.setattr(
mod,
"ensure_drain",
lambda **kwargs: {"success": False, "phase": "create", "response": {"stderr": "boom"}},
)
monkeypatch.setattr(mod, "get_max_row_id", lambda db_path: 0)
monkeypatch.setattr(
mod,
"stop_tunnel",
lambda process, reader_thread: calls.__setitem__("stop_tunnel", calls["stop_tunnel"] + 1) or {"success": True},
)
monkeypatch.setattr(
mod,
"stop_receiver_background",
lambda server, thread: calls.__setitem__("stop_receiver", calls["stop_receiver"] + 1) or {"success": True},
)
monkeypatch.setattr(mod, "delete_drain", lambda **kwargs: (_ for _ in ()).throw(AssertionError("delete_drain should not be called")))
monkeypatch.setattr(
mod,
"analyze_database",
lambda **kwargs: calls.__setitem__("analyze", calls["analyze"] + 1) or {"success": True},
)
result = mod.run_live_session(
cwd=tmp_path,
base_dir=runtime_paths["state_dir"],
minutes=0.01,
bind="127.0.0.1",
port=4319,
secret="shared-secret",
name_prefix="session",
project_id=None,
scope=None,
sources=["serverless"],
headers=None,
delivery_format="json",
tunnel="auto",
tunnel_timeout=10.0,
environment=None,
limit=None,
sample_limit=20,
report_path=None,
)
assert result["success"] is False
assert result["phase"] == "ensure-drain"
assert calls["stop_tunnel"] == 1
assert calls["stop_receiver"] == 1
assert calls["analyze"] == 0

View File

@@ -68,6 +68,22 @@ class TestAtomicJsonWrite:
tmp_files = [f for f in tmp_path.iterdir() if ".tmp" in f.name]
assert len(tmp_files) == 0
def test_cleans_up_temp_file_on_baseexception(self, tmp_path):
class SimulatedAbort(BaseException):
pass
target = tmp_path / "data.json"
original = {"preserved": True}
target.write_text(json.dumps(original), encoding="utf-8")
with patch("utils.json.dump", side_effect=SimulatedAbort):
with pytest.raises(SimulatedAbort):
atomic_json_write(target, {"new": True})
tmp_files = [f for f in tmp_path.iterdir() if ".tmp" in f.name]
assert len(tmp_files) == 0
assert json.loads(target.read_text(encoding="utf-8")) == original
def test_accepts_string_path(self, tmp_path):
target = str(tmp_path / "string_path.json")
atomic_json_write(target, {"string": True})

View File

@@ -0,0 +1,44 @@
"""Tests for utils.atomic_yaml_write — crash-safe YAML file writes."""
from pathlib import Path
from unittest.mock import patch
import pytest
import yaml
from utils import atomic_yaml_write
class TestAtomicYamlWrite:
def test_writes_valid_yaml(self, tmp_path):
target = tmp_path / "data.yaml"
data = {"key": "value", "nested": {"a": 1}}
atomic_yaml_write(target, data)
assert yaml.safe_load(target.read_text(encoding="utf-8")) == data
def test_cleans_up_temp_file_on_baseexception(self, tmp_path):
class SimulatedAbort(BaseException):
pass
target = tmp_path / "data.yaml"
original = {"preserved": True}
target.write_text(yaml.safe_dump(original), encoding="utf-8")
with patch("utils.yaml.dump", side_effect=SimulatedAbort):
with pytest.raises(SimulatedAbort):
atomic_yaml_write(target, {"new": True})
tmp_files = [f for f in tmp_path.iterdir() if ".tmp" in f.name]
assert len(tmp_files) == 0
assert yaml.safe_load(target.read_text(encoding="utf-8")) == original
def test_appends_extra_content(self, tmp_path):
target = tmp_path / "data.yaml"
atomic_yaml_write(target, {"key": "value"}, extra_content="\n# comment\n")
text = target.read_text(encoding="utf-8")
assert "key: value" in text
assert "# comment" in text

View File

@@ -1,8 +1,10 @@
"""Tests for tools/checkpoint_manager.py — CheckpointManager."""
import logging
import os
import json
import shutil
import subprocess
import pytest
from pathlib import Path
from unittest.mock import patch
@@ -143,6 +145,12 @@ class TestTakeCheckpoint:
result = mgr.ensure_checkpoint(str(work_dir), "initial")
assert result is True
def test_successful_checkpoint_does_not_log_expected_diff_exit(self, mgr, work_dir, caplog):
with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
result = mgr.ensure_checkpoint(str(work_dir), "initial")
assert result is True
assert not any("diff --cached --quiet" in r.getMessage() for r in caplog.records)
def test_dedup_same_turn(self, mgr, work_dir):
r1 = mgr.ensure_checkpoint(str(work_dir), "first")
r2 = mgr.ensure_checkpoint(str(work_dir), "second")
@@ -375,6 +383,26 @@ class TestErrorResilience:
result = mgr.ensure_checkpoint(str(work_dir), "test")
assert result is False
def test_run_git_allows_expected_nonzero_without_error_log(self, tmp_path, caplog):
completed = subprocess.CompletedProcess(
args=["git", "diff", "--cached", "--quiet"],
returncode=1,
stdout="",
stderr="",
)
with patch("tools.checkpoint_manager.subprocess.run", return_value=completed):
with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
ok, stdout, stderr = _run_git(
["diff", "--cached", "--quiet"],
tmp_path / "shadow",
str(tmp_path / "work"),
allowed_returncodes={1},
)
assert ok is False
assert stdout == ""
assert stderr == ""
assert not caplog.records
def test_checkpoint_failure_does_not_raise(self, mgr, work_dir, monkeypatch):
"""Checkpoint failures should never raise — they're silently logged."""
def broken_run_git(*args, **kwargs):

View File

@@ -5,6 +5,7 @@ handling without requiring a running terminal environment.
"""
import json
import logging
from unittest.mock import MagicMock, patch
from tools.file_tools import (
@@ -87,13 +88,26 @@ class TestWriteFileHandler:
mock_ops.write_file.assert_called_once_with("/tmp/out.txt", "hello world!\n")
@patch("tools.file_tools._get_file_ops")
def test_exception_returns_error_json(self, mock_get):
def test_permission_error_returns_error_json_without_error_log(self, mock_get, caplog):
mock_get.side_effect = PermissionError("read-only filesystem")
from tools.file_tools import write_file_tool
result = json.loads(write_file_tool("/tmp/out.txt", "data"))
with caplog.at_level(logging.DEBUG, logger="tools.file_tools"):
result = json.loads(write_file_tool("/tmp/out.txt", "data"))
assert "error" in result
assert "read-only" in result["error"]
assert any("write_file expected denial" in r.getMessage() for r in caplog.records)
assert not any(r.levelno >= logging.ERROR for r in caplog.records)
@patch("tools.file_tools._get_file_ops")
def test_unexpected_exception_still_logs_error(self, mock_get, caplog):
mock_get.side_effect = RuntimeError("boom")
from tools.file_tools import write_file_tool
with caplog.at_level(logging.ERROR, logger="tools.file_tools"):
result = json.loads(write_file_tool("/tmp/out.txt", "data"))
assert result["error"] == "boom"
assert any("write_file error" in r.getMessage() for r in caplog.records)
class TestPatchHandler:

View File

@@ -92,10 +92,17 @@ def _run_git(
shadow_repo: Path,
working_dir: str,
timeout: int = _GIT_TIMEOUT,
allowed_returncodes: Optional[Set[int]] = None,
) -> tuple:
"""Run a git command against the shadow repo. Returns (ok, stdout, stderr)."""
"""Run a git command against the shadow repo. Returns (ok, stdout, stderr).
``allowed_returncodes`` suppresses error logging for known/expected non-zero
exits while preserving the normal ``ok = (returncode == 0)`` contract.
Example: ``git diff --cached --quiet`` returns 1 when changes exist.
"""
env = _git_env(shadow_repo, working_dir)
cmd = ["git"] + list(args)
allowed_returncodes = allowed_returncodes or set()
try:
result = subprocess.run(
cmd,
@@ -108,7 +115,7 @@ def _run_git(
ok = result.returncode == 0
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if not ok:
if not ok and result.returncode not in allowed_returncodes:
logger.error(
"Git command failed: %s (rc=%d) stderr=%s",
" ".join(cmd), result.returncode, stderr,
@@ -381,7 +388,10 @@ class CheckpointManager:
# Check if there's anything to commit
ok_diff, diff_out, _ = _run_git(
["diff", "--cached", "--quiet"], shadow, working_dir,
["diff", "--cached", "--quiet"],
shadow,
working_dir,
allowed_returncodes={1},
)
if ok_diff:
# No changes to commit

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3
"""File Tools Module - LLM agent file manipulation tools."""
import errno
import json
import logging
import os
@@ -11,6 +12,18 @@ from agent.redact import redact_sensitive_text
logger = logging.getLogger(__name__)
_EXPECTED_WRITE_ERRNOS = {errno.EACCES, errno.EPERM, errno.EROFS}
def _is_expected_write_exception(exc: Exception) -> bool:
"""Return True for expected write denials that should not hit error logs."""
if isinstance(exc, PermissionError):
return True
if isinstance(exc, OSError) and exc.errno in _EXPECTED_WRITE_ERRNOS:
return True
return False
_file_ops_lock = threading.Lock()
_file_ops_cache: dict = {}
@@ -238,7 +251,10 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
result = file_ops.write_file(path, content)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
logger.error("write_file error: %s: %s", type(e).__name__, e)
if _is_expected_write_exception(e):
logger.debug("write_file expected denial: %s: %s", type(e).__name__, e)
else:
logger.error("write_file error: %s: %s", type(e).__name__, e, exc_info=True)
return json.dumps({"error": str(e)}, ensure_ascii=False)

View File

@@ -50,6 +50,8 @@ def atomic_json_write(
os.fsync(f.fileno())
os.replace(tmp_path, path)
except BaseException:
# Intentionally catch BaseException so temp-file cleanup still runs for
# KeyboardInterrupt/SystemExit before re-raising the original signal.
try:
os.unlink(tmp_path)
except OSError:
@@ -96,6 +98,8 @@ def atomic_yaml_write(
os.fsync(f.fileno())
os.replace(tmp_path, path)
except BaseException:
# Match atomic_json_write: cleanup must also happen for process-level
# interruptions before we re-raise them.
try:
os.unlink(tmp_path)
except OSError: