Files
hermes-agent/tools/file_operations.py
Teknium 5168226d60 feat(file_tools): post-write delta lint on write_file + patch, add JSON/YAML/TOML/Python in-process linters (#20191)
Closes the gap where write_file skipped the post-edit syntax check that
patch already ran, so silent file corruption (bad quote escaping,
truncated writes, etc.) would persist on disk until a later read.

## Changes

tools/file_operations.py:
- Add in-process linters for .py, .json, .yaml, .toml (LINTERS_INPROC).
  Python uses ast.parse, JSON/YAML/TOML use stdlib/PyYAML parsers.
  Zero subprocess overhead; preferred over shell linters when both apply.
- _check_lint() now accepts optional content and routes to in-process
  linter first. Shell linter (py_compile, node --check, tsc, go vet,
  rustfmt) remains the fallback for languages without an in-process
  equivalent.
- New _check_lint_delta() implements the post-first/pre-lazy pattern
  borrowed from Cline and OpenCode: lint post-write state first; only
  if errors are found AND pre-content was captured does it lint the
  pre-state and diff. If the pre-existing file had the SAME errors the
  edit didn't introduce anything new, so the file is reported as 'still
  broken, pre-existing' with success=False but a message explaining the
  errors were pre-existing. If the edit introduced genuinely new errors,
  those are surfaced and pre-existing ones are filtered out.
- WriteResult gains a lint field.
- write_file() captures pre-content for in-process-lintable extensions
  and calls _check_lint_delta after a successful write.
- patch_replace() switches from _check_lint to _check_lint_delta,
  reusing the pre-edit content it already has in scope.

tools/file_tools.py:
- Update write_file schema description to mention the post-write lint.

tests/tools/test_file_operations_edge_cases.py:
- Update existing brace-path tests to use .js (shell linter) now that
  .py is in-process.
- Add TestCheckLintInproc (9 tests) covering Python/JSON/YAML/TOML
  in-process linters.
- Add TestCheckLintDelta (5 tests) covering the post-first/pre-lazy
  short-circuit, new-file path, and the single-error-parser caveat.

## Performance

In-process linters are microseconds per call (ast.parse, json.loads).
The hot path (clean write) runs exactly one lint — matches main's cost
for patch. Pre-state capture is skipped when the file has no applicable
linter. Measured 4.89ms/write average over 100 .py writes including lint.

## Inspiration

- Cline's DiffViewProvider.getNewDiagnosticProblems() — filters pre-write
  diagnostics from post-write diagnostics (src/integrations/editor/DiffViewProvider.ts).
- OpenCode's WriteTool — runs lsp.diagnostics() after write and appends
  errors to tool output (packages/opencode/src/tool/write.ts).
- Claude Code's DiagnosticTrackingService — captures baseline via
  beforeFileEdited() and returns new-diagnostics-only from
  getNewDiagnostics() (src/services/diagnosticTracking.ts).

## Validation

- tests/tools/test_file_operations.py + test_file_operations_edge_cases.py
  + test_file_tools.py + test_file_tools_live.py + test_file_write_safety.py
  + test_write_deny.py + test_patch_parser.py + test_file_ops_cwd_tracking.py:
  228 passed locally.
- Live E2E reproduction of the tips.py corruption incident: broken
  content written; lint field surfaces 'SyntaxError: invalid syntax.
  Perhaps you forgot a comma? (line 6, column 5)' — the exact error
  that would have self-corrected the bug on the next turn.
2026-05-05 04:54:17 -07:00

1562 lines
63 KiB
Python

#!/usr/bin/env python3
"""
File Operations Module
Provides file manipulation capabilities (read, write, patch, search) that work
across all terminal backends (local, docker, singularity, ssh, modal, daytona).
The key insight is that all file operations can be expressed as shell commands,
so we wrap the terminal backend's execute() interface to provide a unified file API.
Usage:
from tools.file_operations import ShellFileOperations
from tools.terminal_tool import _active_environments
# Get file operations for a terminal environment
file_ops = ShellFileOperations(terminal_env)
# Read a file
result = file_ops.read_file("/path/to/file.py")
# Write a file
result = file_ops.write_file("/path/to/new.py", "print('hello')")
# Search for content
result = file_ops.search("TODO", path=".", file_glob="*.py")
"""
import os
import re
import difflib
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from pathlib import Path
from tools.binary_extensions import BINARY_EXTENSIONS
from agent.file_safety import (
build_write_denied_paths,
build_write_denied_prefixes,
get_safe_write_root as _shared_get_safe_write_root,
is_write_denied as _shared_is_write_denied,
)
# ---------------------------------------------------------------------------
# Write-path deny list — blocks writes to sensitive system/credential files
# ---------------------------------------------------------------------------
_HOME = str(Path.home())
WRITE_DENIED_PATHS = build_write_denied_paths(_HOME)
WRITE_DENIED_PREFIXES = build_write_denied_prefixes(_HOME)
_OSC_SEQUENCE_RE = re.compile(r"\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)")
_FENCE_MARKER_RE = re.compile(r"'?\x07?__HERMES_FENCE_[A-Za-z0-9]+__\x07?'?")
def _strip_terminal_fence_leaks(text: str) -> str:
"""Strip leaked terminal fence wrappers from file read output."""
if not text:
return text
cleaned_lines: List[str] = []
for line in text.splitlines(keepends=True):
had_terminal_wrapper = "__HERMES_FENCE_" in line or "\x1b]" in line
cleaned = _OSC_SEQUENCE_RE.sub("", line)
cleaned = _FENCE_MARKER_RE.sub("", cleaned)
cleaned = cleaned.replace("\x07", "")
if had_terminal_wrapper and cleaned.strip("'\r\n\t ") == "":
continue
cleaned_lines.append(cleaned)
return "".join(cleaned_lines)
def _get_safe_write_root() -> Optional[str]:
"""Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset.
When set, all write_file/patch operations are constrained to this
directory tree. Writes outside it are denied even if the target is
not on the static deny list. Opt-in hardening for gateway/messaging
deployments that should only touch a workspace checkout.
"""
return _shared_get_safe_write_root()
def _is_write_denied(path: str) -> bool:
"""Return True if path is on the write deny list."""
return _shared_is_write_denied(path)
# =============================================================================
# Result Data Classes
# =============================================================================
@dataclass
class ReadResult:
"""Result from reading a file."""
content: str = ""
total_lines: int = 0
file_size: int = 0
truncated: bool = False
hint: Optional[str] = None
is_binary: bool = False
is_image: bool = False
base64_content: Optional[str] = None
mime_type: Optional[str] = None
dimensions: Optional[str] = None # For images: "WIDTHxHEIGHT"
error: Optional[str] = None
similar_files: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {k: v for k, v in self.__dict__.items() if v is not None and v != []}
@dataclass
class WriteResult:
"""Result from writing a file."""
bytes_written: int = 0
dirs_created: bool = False
lint: Optional[Dict[str, Any]] = None
error: Optional[str] = None
warning: Optional[str] = None
def to_dict(self) -> dict:
return {k: v for k, v in self.__dict__.items() if v is not None}
@dataclass
class PatchResult:
"""Result from patching a file."""
success: bool = False
diff: str = ""
files_modified: List[str] = field(default_factory=list)
files_created: List[str] = field(default_factory=list)
files_deleted: List[str] = field(default_factory=list)
lint: Optional[Dict[str, Any]] = None
error: Optional[str] = None
def to_dict(self) -> dict:
result = {"success": self.success}
if self.diff:
result["diff"] = self.diff
if self.files_modified:
result["files_modified"] = self.files_modified
if self.files_created:
result["files_created"] = self.files_created
if self.files_deleted:
result["files_deleted"] = self.files_deleted
if self.lint:
result["lint"] = self.lint
if self.error:
result["error"] = self.error
return result
@dataclass
class SearchMatch:
"""A single search match."""
path: str
line_number: int
content: str
mtime: float = 0.0 # Modification time for sorting
@dataclass
class SearchResult:
"""Result from searching."""
matches: List[SearchMatch] = field(default_factory=list)
files: List[str] = field(default_factory=list)
counts: Dict[str, int] = field(default_factory=dict)
total_count: int = 0
truncated: bool = False
error: Optional[str] = None
def to_dict(self) -> dict:
result = {"total_count": self.total_count}
if self.matches:
result["matches"] = [
{"path": m.path, "line": m.line_number, "content": m.content}
for m in self.matches
]
if self.files:
result["files"] = self.files
if self.counts:
result["counts"] = self.counts
if self.truncated:
result["truncated"] = True
if self.error:
result["error"] = self.error
return result
@dataclass
class LintResult:
"""Result from linting a file."""
success: bool = True
skipped: bool = False
output: str = ""
message: str = ""
def to_dict(self) -> dict:
if self.skipped:
return {"status": "skipped", "message": self.message}
result = {"status": "ok" if self.success else "error", "output": self.output}
if self.message:
result["message"] = self.message
return result
@dataclass
class ExecuteResult:
"""Result from executing a shell command."""
stdout: str = ""
exit_code: int = 0
def _parse_search_context_line(line: str) -> tuple[str, int, str] | None:
"""Parse grep/rg context output in ``path-line-content`` format.
Context lines are ambiguous because filenames may legitimately contain
``-<digits>-`` segments. Prefer the rightmost numeric separator so a path
like ``dir/file-12-name.py-8-context`` resolves to
``dir/file-12-name.py`` line ``8`` instead of truncating at ``file``.
"""
if not line or line == "--":
return None
match = None
for candidate in re.finditer(r'-(\d+)-', line):
match = candidate
if match is None:
return None
path = line[:match.start()]
if not path:
return None
return path, int(match.group(1)), line[match.end():]
# =============================================================================
# Abstract Interface
# =============================================================================
class FileOperations(ABC):
"""Abstract interface for file operations across terminal backends."""
@abstractmethod
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
"""Read a file with pagination support."""
...
@abstractmethod
def read_file_raw(self, path: str) -> ReadResult:
"""Read the complete file content as a plain string.
No pagination, no line-number prefixes, no per-line truncation.
Returns ReadResult with .content = full file text, .error set on
failure. Always reads to EOF regardless of file size.
"""
...
@abstractmethod
def write_file(self, path: str, content: str) -> WriteResult:
"""Write content to a file, creating directories as needed."""
...
@abstractmethod
def patch_replace(self, path: str, old_string: str, new_string: str,
replace_all: bool = False) -> PatchResult:
"""Replace text in a file using fuzzy matching."""
...
@abstractmethod
def patch_v4a(self, patch_content: str) -> PatchResult:
"""Apply a V4A format patch."""
...
@abstractmethod
def delete_file(self, path: str) -> WriteResult:
"""Delete a file. Returns WriteResult with .error set on failure."""
...
@abstractmethod
def move_file(self, src: str, dst: str) -> WriteResult:
"""Move/rename a file from src to dst. Returns WriteResult with .error set on failure."""
...
@abstractmethod
def search(self, pattern: str, path: str = ".", target: str = "content",
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
output_mode: str = "content", context: int = 0) -> SearchResult:
"""Search for content or files."""
...
# =============================================================================
# Shell-based Implementation
# =============================================================================
# Image extensions (subset of binary that we can return as base64)
IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico'}
# Shell-based linters by file extension. Invoked via _exec() with the
# filesystem path. Cover languages where a compile/type check needs an
# external toolchain (py_compile, node, tsc, go vet, rustfmt).
LINTERS = {
'.py': 'python -m py_compile {file} 2>&1',
'.js': 'node --check {file} 2>&1',
'.ts': 'npx tsc --noEmit {file} 2>&1',
'.go': 'go vet {file} 2>&1',
'.rs': 'rustfmt --check {file} 2>&1',
}
def _lint_json_inproc(content: str) -> tuple[bool, str]:
"""In-process JSON syntax check. Returns (ok, error_message)."""
import json as _json
try:
_json.loads(content)
return True, ""
except _json.JSONDecodeError as e:
return False, f"JSONDecodeError: {e.msg} (line {e.lineno}, column {e.colno})"
except Exception as e: # noqa: BLE001 — any parse failure is a lint failure
return False, f"{type(e).__name__}: {e}"
def _lint_yaml_inproc(content: str) -> tuple[bool, str]:
"""In-process YAML syntax check. Returns (ok, error_message).
Skipped gracefully if PyYAML isn't installed — YAML parsing is optional.
"""
try:
import yaml as _yaml
except ImportError:
# PyYAML not available — skip silently, caller treats as no linter.
return True, "__SKIP__"
try:
_yaml.safe_load(content)
return True, ""
except _yaml.YAMLError as e:
return False, f"YAMLError: {e}"
except Exception as e: # noqa: BLE001
return False, f"{type(e).__name__}: {e}"
def _lint_toml_inproc(content: str) -> tuple[bool, str]:
"""In-process TOML syntax check (stdlib tomllib, Python 3.11+)."""
try:
import tomllib as _toml
except ImportError:
# Pre-3.11 fallback via tomli, if installed.
try:
import tomli as _toml # type: ignore[no-redef]
except ImportError:
return True, "__SKIP__"
try:
_toml.loads(content)
return True, ""
except Exception as e: # tomllib raises TOMLDecodeError, a ValueError subclass
return False, f"{type(e).__name__}: {e}"
def _lint_python_inproc(content: str) -> tuple[bool, str]:
"""In-process Python syntax check via ast.parse.
Catches SyntaxError, IndentationError, and everything else the
ast module rejects — matching py_compile's scope but with no
subprocess overhead and no dependency on a ``python`` in PATH.
"""
import ast as _ast
try:
_ast.parse(content)
return True, ""
except SyntaxError as e:
loc = f" (line {e.lineno}, column {e.offset})" if e.lineno else ""
return False, f"{type(e).__name__}: {e.msg}{loc}"
except Exception as e: # noqa: BLE001
return False, f"{type(e).__name__}: {e}"
# In-process linters by file extension. Preferred over shell linters when
# present — no subprocess overhead, microseconds per call. Each callable
# takes file content (str) and returns (ok: bool, error: str). An error
# string of ``"__SKIP__"`` signals the linter isn't available (missing
# dependency) and should be treated as "no linter".
LINTERS_INPROC = {
'.py': _lint_python_inproc,
'.json': _lint_json_inproc,
'.yaml': _lint_yaml_inproc,
'.yml': _lint_yaml_inproc,
'.toml': _lint_toml_inproc,
}
# Max limits for read operations
MAX_LINES = 2000
MAX_LINE_LENGTH = 2000
MAX_FILE_SIZE = 50 * 1024 # 50KB
DEFAULT_READ_OFFSET = 1
DEFAULT_READ_LIMIT = 500
DEFAULT_SEARCH_OFFSET = 0
DEFAULT_SEARCH_LIMIT = 50
def _coerce_int(value: Any, default: int) -> int:
"""Best-effort integer coercion for tool pagination inputs."""
try:
return int(value)
except (TypeError, ValueError):
return default
def normalize_read_pagination(offset: Any = DEFAULT_READ_OFFSET,
limit: Any = DEFAULT_READ_LIMIT) -> tuple[int, int]:
"""Return safe read_file pagination bounds.
Tool schemas declare minimum/maximum values, but not every caller or
provider enforces schemas before dispatch. Clamp here so invalid values
cannot leak into sed ranges like ``0,-1p``.
The upper bound on ``limit`` comes from ``tool_output.max_lines`` in
config.yaml (defaults to the module-level ``MAX_LINES`` constant).
"""
from tools.tool_output_limits import get_max_lines
max_lines = get_max_lines()
normalized_offset = max(1, _coerce_int(offset, DEFAULT_READ_OFFSET))
normalized_limit = _coerce_int(limit, DEFAULT_READ_LIMIT)
normalized_limit = max(1, min(normalized_limit, max_lines))
return normalized_offset, normalized_limit
def normalize_search_pagination(offset: Any = DEFAULT_SEARCH_OFFSET,
limit: Any = DEFAULT_SEARCH_LIMIT) -> tuple[int, int]:
"""Return safe search pagination bounds for shell head/tail pipelines."""
normalized_offset = max(0, _coerce_int(offset, DEFAULT_SEARCH_OFFSET))
normalized_limit = max(1, _coerce_int(limit, DEFAULT_SEARCH_LIMIT))
return normalized_offset, normalized_limit
class ShellFileOperations(FileOperations):
"""
File operations implemented via shell commands.
Works with ANY terminal backend that has execute(command, cwd) method.
This includes local, docker, singularity, ssh, modal, and daytona environments.
"""
def __init__(self, terminal_env, cwd: str = None):
"""
Initialize file operations with a terminal environment.
Args:
terminal_env: Any object with execute(command, cwd) method.
Returns {"output": str, "returncode": int}
cwd: Optional explicit fallback cwd when the terminal env has
no cwd attribute (rare — most backends track cwd live).
Note:
Every _exec() call prefers the LIVE ``terminal_env.cwd`` over
``self.cwd`` so ``cd`` commands run via the terminal tool are
picked up immediately. ``self.cwd`` is only used as a fallback
when the env has no cwd at all — it is NOT the authoritative
cwd, despite being settable at init time.
Historical bug (fixed): prior versions of this class used the
init-time cwd for every _exec() call, which caused relative
paths passed to patch/read/write to target the wrong directory
after the user ran ``cd`` in the terminal. Patches would
claim success and return a plausible diff but land in the
original directory, producing apparent silent failures.
"""
self.env = terminal_env
# Determine cwd from various possible sources.
# IMPORTANT: do NOT fall back to os.getcwd() -- that's the HOST's local
# path which doesn't exist inside container/cloud backends (modal, docker).
# If nothing provides a cwd, use "/" as a safe universal default.
self.cwd = cwd or getattr(terminal_env, 'cwd', None) or \
getattr(getattr(terminal_env, 'config', None), 'cwd', None) or "/"
# Cache for command availability checks
self._command_cache: Dict[str, bool] = {}
def _exec(self, command: str, cwd: str = None, timeout: int = None,
stdin_data: str = None) -> ExecuteResult:
"""Execute command via terminal backend.
Args:
stdin_data: If provided, piped to the process's stdin instead of
embedding in the command string. Bypasses ARG_MAX.
Cwd resolution order (critical — see class docstring):
1. Explicit ``cwd`` arg (if provided)
2. Live ``self.env.cwd`` (tracks ``cd`` commands run via terminal)
3. Init-time ``self.cwd`` (fallback when env has no cwd attribute)
This ordering ensures relative paths in file operations follow the
terminal's current directory — not the directory this file_ops was
originally created in. See test_file_ops_cwd_tracking.py.
"""
kwargs = {}
if timeout:
kwargs['timeout'] = timeout
if stdin_data is not None:
kwargs['stdin_data'] = stdin_data
# Resolve cwd from the live env so `cd` commands are picked up.
# Fall through to init-time self.cwd only if the env doesn't track cwd.
effective_cwd = cwd or getattr(self.env, 'cwd', None) or self.cwd
result = self.env.execute(command, cwd=effective_cwd, **kwargs)
return ExecuteResult(
stdout=result.get("output", ""),
exit_code=result.get("returncode", 0)
)
def _has_command(self, cmd: str) -> bool:
"""Check if a command exists in the environment (cached)."""
if cmd not in self._command_cache:
result = self._exec(f"command -v {cmd} >/dev/null 2>&1 && echo 'yes'")
self._command_cache[cmd] = result.stdout.strip() == 'yes'
return self._command_cache[cmd]
def _is_likely_binary(self, path: str, content_sample: str = None) -> bool:
"""
Check if a file is likely binary.
Uses extension check (fast) + content analysis (fallback).
"""
ext = os.path.splitext(path)[1].lower()
if ext in BINARY_EXTENSIONS:
return True
# Content analysis: >30% non-printable chars = binary
if content_sample:
non_printable = sum(1 for c in content_sample[:1000]
if ord(c) < 32 and c not in '\n\r\t')
return non_printable / min(len(content_sample), 1000) > 0.30
return False
def _is_image(self, path: str) -> bool:
"""Check if file is an image we can return as base64."""
ext = os.path.splitext(path)[1].lower()
return ext in IMAGE_EXTENSIONS
def _add_line_numbers(self, content: str, start_line: int = 1) -> str:
"""Add line numbers to content in LINE_NUM|CONTENT format."""
from tools.tool_output_limits import get_max_line_length
max_line_length = get_max_line_length()
lines = content.split('\n')
numbered = []
for i, line in enumerate(lines, start=start_line):
# Truncate long lines
if len(line) > max_line_length:
line = line[:max_line_length] + "... [truncated]"
numbered.append(f"{i:6d}|{line}")
return '\n'.join(numbered)
def _expand_path(self, path: str) -> str:
"""
Expand shell-style paths like ~ and ~user to absolute paths.
This must be done BEFORE shell escaping, since ~ doesn't expand
inside single quotes.
"""
if not path:
return path
# Handle ~ and ~user
if path.startswith('~'):
# Get home directory via the terminal environment
result = self._exec("echo $HOME")
if result.exit_code == 0 and result.stdout.strip():
home = result.stdout.strip()
if path == '~':
return home
elif path.startswith('~/'):
return home + path[1:] # Replace ~ with home
# ~username format - extract and validate username before
# letting shell expand it (prevent shell injection via
# paths like "~; rm -rf /").
rest = path[1:] # strip leading ~
slash_idx = rest.find('/')
username = rest[:slash_idx] if slash_idx >= 0 else rest
if username and re.fullmatch(r'[a-zA-Z0-9._-]+', username):
# Only expand ~username (not the full path) to avoid shell
# injection via path suffixes like "~user/$(malicious)".
expand_result = self._exec(f"echo ~{username}")
if expand_result.exit_code == 0 and expand_result.stdout.strip():
user_home = expand_result.stdout.strip()
suffix = path[1 + len(username):] # e.g. "/rest/of/path"
return user_home + suffix
return path
def _escape_shell_arg(self, arg: str) -> str:
"""Escape a string for safe use in shell commands."""
# Use single quotes and escape any single quotes in the string
return "'" + arg.replace("'", "'\"'\"'") + "'"
def _unified_diff(self, old_content: str, new_content: str, filename: str) -> str:
"""Generate unified diff between old and new content."""
old_lines = old_content.splitlines(keepends=True)
new_lines = new_content.splitlines(keepends=True)
diff = difflib.unified_diff(
old_lines, new_lines,
fromfile=f"a/{filename}",
tofile=f"b/{filename}"
)
return ''.join(diff)
# =========================================================================
# READ Implementation
# =========================================================================
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
"""
Read a file with pagination, binary detection, and line numbers.
Args:
path: File path (absolute or relative to cwd)
offset: Line number to start from (1-indexed, default 1)
limit: Maximum lines to return (default 500, max 2000)
Returns:
ReadResult with content, metadata, or error info
"""
# Expand ~ and other shell paths
path = self._expand_path(path)
offset, limit = normalize_read_pagination(offset, limit)
# Check if file exists and get size (wc -c is POSIX, works on Linux + macOS)
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
stat_result = self._exec(stat_cmd)
if stat_result.exit_code != 0:
# File not found - try to suggest similar files
return self._suggest_similar_files(path)
stat_output = _strip_terminal_fence_leaks(stat_result.stdout)
try:
file_size = int(stat_output.strip())
except ValueError:
file_size = 0
# Check if file is too large
if file_size > MAX_FILE_SIZE:
# Still try to read, but warn
pass
# Images are never inlined — redirect to the vision tool
if self._is_image(path):
return ReadResult(
is_image=True,
is_binary=True,
file_size=file_size,
hint=(
"Image file detected. Automatically redirected to vision_analyze tool. "
"Use vision_analyze with this file path to inspect the image contents."
),
)
# Read a sample to check for binary content
sample_cmd = f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null"
sample_result = self._exec(sample_cmd)
sample_output = _strip_terminal_fence_leaks(sample_result.stdout)
if self._is_likely_binary(path, sample_output):
return ReadResult(
is_binary=True,
file_size=file_size,
error="Binary file - cannot display as text. Use appropriate tools to handle this file type."
)
# Read with pagination using sed
end_line = offset + limit - 1
read_cmd = f"sed -n '{offset},{end_line}p' {self._escape_shell_arg(path)}"
read_result = self._exec(read_cmd)
if read_result.exit_code != 0:
return ReadResult(error=f"Failed to read file: {read_result.stdout}")
read_output = _strip_terminal_fence_leaks(read_result.stdout)
# Get total line count
wc_cmd = f"wc -l < {self._escape_shell_arg(path)}"
wc_result = self._exec(wc_cmd)
wc_output = _strip_terminal_fence_leaks(wc_result.stdout)
try:
total_lines = int(wc_output.strip())
except ValueError:
total_lines = 0
# Check if truncated
truncated = total_lines > end_line
hint = None
if truncated:
hint = f"Use offset={end_line + 1} to continue reading (showing {offset}-{end_line} of {total_lines} lines)"
return ReadResult(
content=self._add_line_numbers(read_output, offset),
total_lines=total_lines,
file_size=file_size,
truncated=truncated,
hint=hint
)
def _suggest_similar_files(self, path: str) -> ReadResult:
"""Suggest similar files when the requested file is not found."""
dir_path = os.path.dirname(path) or "."
filename = os.path.basename(path)
basename_no_ext = os.path.splitext(filename)[0]
ext = os.path.splitext(filename)[1].lower()
lower_name = filename.lower()
# List files in the target directory
ls_cmd = f"ls -1 {self._escape_shell_arg(dir_path)} 2>/dev/null | head -50"
ls_result = self._exec(ls_cmd)
scored: list = [] # (score, filepath) — higher is better
if ls_result.exit_code == 0 and ls_result.stdout.strip():
for f in ls_result.stdout.strip().split('\n'):
if not f:
continue
lf = f.lower()
score = 0
# Exact match (shouldn't happen, but guard)
if lf == lower_name:
score = 100
# Same base name, different extension (e.g. config.yml vs config.yaml)
elif os.path.splitext(f)[0].lower() == basename_no_ext.lower():
score = 90
# Target is prefix of candidate or vice-versa
elif lf.startswith(lower_name) or lower_name.startswith(lf):
score = 70
# Substring match (candidate contains query)
elif lower_name in lf:
score = 60
# Reverse substring (query contains candidate name)
elif lf in lower_name and len(lf) > 2:
score = 40
# Same extension with some overlap
elif ext and os.path.splitext(f)[1].lower() == ext:
common = set(lower_name) & set(lf)
if len(common) >= max(len(lower_name), len(lf)) * 0.4:
score = 30
if score > 0:
scored.append((score, os.path.join(dir_path, f)))
scored.sort(key=lambda x: -x[0])
similar = [fp for _, fp in scored[:5]]
return ReadResult(
error=f"File not found: {path}",
similar_files=similar
)
def read_file_raw(self, path: str) -> ReadResult:
"""Read the complete file content as a plain string.
No pagination, no line-number prefixes, no per-line truncation.
Uses cat so the full file is returned regardless of size.
"""
path = self._expand_path(path)
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
stat_result = self._exec(stat_cmd)
if stat_result.exit_code != 0:
return self._suggest_similar_files(path)
stat_output = _strip_terminal_fence_leaks(stat_result.stdout)
try:
file_size = int(stat_output.strip())
except ValueError:
file_size = 0
if self._is_image(path):
return ReadResult(is_image=True, is_binary=True, file_size=file_size)
sample_result = self._exec(f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null")
sample_output = _strip_terminal_fence_leaks(sample_result.stdout)
if self._is_likely_binary(path, sample_output):
return ReadResult(
is_binary=True, file_size=file_size,
error="Binary file — cannot display as text."
)
cat_result = self._exec(f"cat {self._escape_shell_arg(path)}")
if cat_result.exit_code != 0:
return ReadResult(error=f"Failed to read file: {cat_result.stdout}")
return ReadResult(
content=_strip_terminal_fence_leaks(cat_result.stdout),
file_size=file_size,
)
def delete_file(self, path: str) -> WriteResult:
"""Delete a file via rm."""
path = self._expand_path(path)
if _is_write_denied(path):
return WriteResult(error=f"Delete denied: {path} is a protected path")
result = self._exec(f"rm -f {self._escape_shell_arg(path)}")
if result.exit_code != 0:
return WriteResult(error=f"Failed to delete {path}: {result.stdout}")
return WriteResult()
def move_file(self, src: str, dst: str) -> WriteResult:
"""Move a file via mv."""
src = self._expand_path(src)
dst = self._expand_path(dst)
for p in (src, dst):
if _is_write_denied(p):
return WriteResult(error=f"Move denied: {p} is a protected path")
result = self._exec(
f"mv {self._escape_shell_arg(src)} {self._escape_shell_arg(dst)}"
)
if result.exit_code != 0:
return WriteResult(error=f"Failed to move {src} -> {dst}: {result.stdout}")
return WriteResult()
# =========================================================================
# WRITE Implementation
# =========================================================================
def write_file(self, path: str, content: str) -> WriteResult:
"""
Write content to a file, creating parent directories as needed.
Pipes content through stdin to avoid OS ARG_MAX limits on large
files. The content never appears in the shell command string —
only the file path does.
After the write, runs a post-first / pre-lazy lint check via
``_check_lint_delta()``. If the new content is clean, the lint
call is O(one parse). If the new content has errors, the pre-write
content is linted too and only errors newly introduced by this
write are surfaced — pre-existing problems are filtered out so
the agent isn't distracted chasing them.
Args:
path: File path to write
content: Content to write
Returns:
WriteResult with bytes written, lint summary, or error.
"""
# Expand ~ and other shell paths
path = self._expand_path(path)
# Block writes to sensitive paths
if _is_write_denied(path):
return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.")
# Capture pre-write content for lint-delta computation. Only do this
# when an in-process OR shell linter exists for this extension — no
# point paying for the read otherwise. For in-process linters we
# pass the content directly; for shell linters the pre-state isn't
# useful (we'd have to re-write-read to lint the old version, which
# defeats the purpose), so we skip the capture and accept the naive
# "all errors" report.
ext = os.path.splitext(path)[1].lower()
pre_content: Optional[str] = None
if ext in LINTERS_INPROC:
# Best-effort read; failure (file missing, permission) leaves
# pre_content as None which makes the delta step degrade
# gracefully to "report all errors".
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
read_result = self._exec(read_cmd)
if read_result.exit_code == 0 and read_result.stdout:
pre_content = read_result.stdout
# Create parent directories
parent = os.path.dirname(path)
dirs_created = False
if parent:
mkdir_cmd = f"mkdir -p {self._escape_shell_arg(parent)}"
mkdir_result = self._exec(mkdir_cmd)
if mkdir_result.exit_code == 0:
dirs_created = True
# Write via stdin pipe — content bypasses shell arg parsing entirely,
# so there's no ARG_MAX limit regardless of file size.
write_cmd = f"cat > {self._escape_shell_arg(path)}"
write_result = self._exec(write_cmd, stdin_data=content)
if write_result.exit_code != 0:
return WriteResult(error=f"Failed to write file: {write_result.stdout}")
# Get bytes written (wc -c is POSIX, works on Linux + macOS)
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
stat_result = self._exec(stat_cmd)
try:
bytes_written = int(stat_result.stdout.strip())
except ValueError:
bytes_written = len(content.encode('utf-8'))
# Post-write lint with delta refinement.
lint_result = self._check_lint_delta(path, pre_content=pre_content, post_content=content)
return WriteResult(
bytes_written=bytes_written,
dirs_created=dirs_created,
lint=lint_result.to_dict() if lint_result else None,
)
# =========================================================================
# PATCH Implementation (Replace Mode)
# =========================================================================
def patch_replace(self, path: str, old_string: str, new_string: str,
replace_all: bool = False) -> PatchResult:
"""
Replace text in a file using fuzzy matching.
Args:
path: File path to modify
old_string: Text to find (must be unique unless replace_all=True)
new_string: Replacement text
replace_all: If True, replace all occurrences
Returns:
PatchResult with diff and lint results
"""
# Expand ~ and other shell paths
path = self._expand_path(path)
# Block writes to sensitive paths
if _is_write_denied(path):
return PatchResult(error=f"Write denied: '{path}' is a protected system/credential file.")
# Read current content
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
read_result = self._exec(read_cmd)
if read_result.exit_code != 0:
return PatchResult(error=f"Failed to read file: {path}")
content = read_result.stdout
# Import and use fuzzy matching
from tools.fuzzy_match import fuzzy_find_and_replace
new_content, match_count, _strategy, error = fuzzy_find_and_replace(
content, old_string, new_string, replace_all
)
if error or match_count == 0:
err_msg = error or f"Could not find match for old_string in {path}"
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(err_msg, match_count, old_string, content)
except Exception:
pass
return PatchResult(error=err_msg)
# Write back
write_result = self.write_file(path, new_content)
if write_result.error:
return PatchResult(error=f"Failed to write changes: {write_result.error}")
# Post-write verification — re-read the file and confirm the bytes we
# intended to write actually landed. Catches silent persistence
# failures (backend FS oddities, race with another task, truncated
# pipe, etc.) that would otherwise return success-with-diff while the
# file is unchanged on disk.
verify_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
verify_result = self._exec(verify_cmd)
if verify_result.exit_code != 0:
return PatchResult(error=f"Post-write verification failed: could not re-read {path}")
if verify_result.stdout != new_content:
return PatchResult(error=(
f"Post-write verification failed for {path}: on-disk content "
f"differs from intended write "
f"(wrote {len(new_content)} chars, read back {len(verify_result.stdout)}). "
"The patch did not persist. Re-read the file and try again."
))
# Generate diff
diff = self._unified_diff(content, new_content, path)
# Auto-lint with delta refinement: only surface errors introduced
# by this patch, filtering out pre-existing lint failures so the
# agent isn't distracted by problems that were already there.
lint_result = self._check_lint_delta(path, pre_content=content, post_content=new_content)
return PatchResult(
success=True,
diff=diff,
files_modified=[path],
lint=lint_result.to_dict() if lint_result else None
)
def patch_v4a(self, patch_content: str) -> PatchResult:
"""
Apply a V4A format patch.
V4A format:
*** Begin Patch
*** Update File: path/to/file.py
@@ context hint @@
context line
-removed line
+added line
*** End Patch
Args:
patch_content: V4A format patch string
Returns:
PatchResult with changes made
"""
# Import patch parser
from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
operations, parse_error = parse_v4a_patch(patch_content)
if parse_error:
return PatchResult(error=f"Failed to parse patch: {parse_error}")
# Apply operations
result = apply_v4a_operations(operations, self)
return result
def _check_lint(self, path: str, content: Optional[str] = None) -> LintResult:
"""
Run syntax check on a file after editing.
Prefers the in-process linter for structured formats (JSON, YAML,
TOML) when possible — those parse via the Python stdlib in
microseconds and don't require a subprocess. Falls back to the
shell linter table for compiled/type-checked languages
(py_compile, node --check, tsc, go vet, rustfmt).
Args:
path: File path (used to select the linter + for shell invocation).
content: Optional file content. If provided AND an in-process
linter matches the extension, we lint the content
directly without re-reading the file from disk. Ignored
for shell linters.
Returns:
LintResult with status and any errors.
"""
ext = os.path.splitext(path)[1].lower()
# Prefer in-process linter when available.
inproc = LINTERS_INPROC.get(ext)
if inproc is not None:
# Need content — either passed in or read from disk.
if content is None:
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
read_result = self._exec(read_cmd)
if read_result.exit_code != 0:
return LintResult(skipped=True, message=f"Failed to read {path} for lint")
content = read_result.stdout
ok, err = inproc(content)
if err == "__SKIP__":
return LintResult(skipped=True, message=f"No linter available for {ext} (missing dependency)")
return LintResult(success=ok, output="" if ok else err)
# Fall back to shell linter.
if ext not in LINTERS:
return LintResult(skipped=True, message=f"No linter for {ext} files")
linter_cmd = LINTERS[ext]
# Extract the base command (first word)
base_cmd = linter_cmd.split()[0]
if not self._has_command(base_cmd):
return LintResult(skipped=True, message=f"{base_cmd} not available")
# Run linter
cmd = linter_cmd.replace("{file}", self._escape_shell_arg(path))
result = self._exec(cmd, timeout=30)
return LintResult(
success=result.exit_code == 0,
output=result.stdout.strip() if result.stdout.strip() else ""
)
def _check_lint_delta(self, path: str, pre_content: Optional[str],
post_content: Optional[str] = None) -> LintResult:
"""
Run post-write lint with pre-write baseline comparison.
Strategy (post-first, pre-lazy):
1. Lint the post-write state. If clean → return clean immediately.
This is the hot path and matches _check_lint() in cost.
2. If post-lint found errors AND we have pre-write content, lint
that too. If the pre-write file was already broken, return only
the *new* errors introduced by this edit — errors that existed
before aren't the agent's problem to chase right now.
3. If pre_content is None (new file or unavailable), skip the delta
step and return all post-write errors.
This mirrors Cline's and OpenCode's post-edit LSP pattern: surface
only the errors this specific edit introduced, so the agent doesn't
get distracted by pre-existing problems.
Args:
path: File path (for linter selection).
pre_content: File content BEFORE the write. Pass None for new
files or when the pre-state isn't available — the
delta refinement is skipped and all post errors
are returned.
post_content: File content AFTER the write. Optional; if None,
the shell linter reads from disk (same as
_check_lint).
Returns:
LintResult. ``output`` contains either the full post-lint
errors (no pre-state) or just the new-error lines (delta
refinement applied).
"""
post = self._check_lint(path, content=post_content)
# Hot path: clean post-write, no pre-lint needed.
if post.success or post.skipped:
return post
# Post-write has errors. If we have pre-content, run the delta
# refinement to filter out pre-existing errors.
if pre_content is None:
return post
pre = self._check_lint(path, content=pre_content)
if pre.success or pre.skipped or not pre.output:
# Pre-write was clean (or we couldn't lint it) — post errors
# are all new. Return the full post output.
return post
# Both pre- and post-write had errors. Compute the set-difference
# on non-empty stripped lines. Caveat: single-error parsers
# (ast.parse, json.loads) stop at the first error and don't report
# later ones — if the pre-existing error blocks parsing before
# reaching the edit region, we can't prove the edit is clean. So
# if every post error also appeared pre-edit, we report the file
# as still broken but annotate that this edit introduced nothing
# new on top — the agent knows it's inherited state, not fresh
# damage, without silently dropping the error.
pre_lines = {ln.strip() for ln in pre.output.splitlines() if ln.strip()}
post_lines = [ln for ln in post.output.splitlines() if ln.strip() and ln.strip() not in pre_lines]
if not post_lines:
# Every error in post was also in pre — this edit didn't make
# anything obviously worse, but the file remains broken and
# the agent should know.
return LintResult(
success=False,
output=post.output,
message="Pre-existing lint errors — this edit didn't introduce new ones but the file is still broken.",
)
return LintResult(
success=False,
output=(
"New lint errors introduced by this edit "
"(pre-existing errors filtered out):\n" + "\n".join(post_lines)
)
)
# =========================================================================
# SEARCH Implementation
# =========================================================================
def search(self, pattern: str, path: str = ".", target: str = "content",
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
output_mode: str = "content", context: int = 0) -> SearchResult:
"""
Search for content or files.
Args:
pattern: Regex (for content) or glob pattern (for files)
path: Directory/file to search (default: cwd)
target: "content" (grep) or "files" (glob)
file_glob: File pattern filter for content search (e.g., "*.py")
limit: Max results (default 50)
offset: Skip first N results
output_mode: "content", "files_only", or "count"
context: Lines of context around matches
Returns:
SearchResult with matches or file list
"""
offset, limit = normalize_search_pagination(offset, limit)
# Expand ~ and other shell paths
path = self._expand_path(path)
# Validate that the path exists before searching
check = self._exec(f"test -e {self._escape_shell_arg(path)} && echo exists || echo not_found")
if "not_found" in check.stdout:
# Try to suggest nearby paths
parent = os.path.dirname(path) or "."
basename_query = os.path.basename(path)
hint_parts = [f"Path not found: {path}"]
# Check if parent directory exists and list similar entries
parent_check = self._exec(
f"test -d {self._escape_shell_arg(parent)} && echo yes || echo no"
)
if "yes" in parent_check.stdout and basename_query:
ls_result = self._exec(
f"ls -1 {self._escape_shell_arg(parent)} 2>/dev/null | head -20"
)
if ls_result.exit_code == 0 and ls_result.stdout.strip():
lower_q = basename_query.lower()
candidates = []
for entry in ls_result.stdout.strip().split('\n'):
if not entry:
continue
le = entry.lower()
if lower_q in le or le in lower_q or le.startswith(lower_q[:3]):
candidates.append(os.path.join(parent, entry))
if candidates:
hint_parts.append(
"Similar paths: " + ", ".join(candidates[:5])
)
return SearchResult(
error=". ".join(hint_parts),
total_count=0
)
if target == "files":
return self._search_files(pattern, path, limit, offset)
else:
return self._search_content(pattern, path, file_glob, limit, offset,
output_mode, context)
def _search_files(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
"""Search for files by name pattern (glob-like)."""
# Auto-prepend **/ for recursive search if not already present
if not pattern.startswith('**/') and '/' not in pattern:
search_pattern = pattern
else:
search_pattern = pattern.split('/')[-1]
search_root = Path(path)
has_hidden_path_ancestor = any(
part not in (".", "..") and part.startswith(".")
for part in search_root.parts
)
# Prefer ripgrep: respects .gitignore, excludes hidden dirs by
# default, and has parallel directory traversal (~200x faster than
# find on wide trees). Mirrors _search_content which already uses rg.
if self._has_command('rg'):
return self._search_files_rg(search_pattern, path, limit, offset)
# Fallback: find (slower, no .gitignore awareness)
if not self._has_command('find'):
return SearchResult(
error="File search requires 'rg' (ripgrep) or 'find'. "
"Install ripgrep for best results: "
"https://github.com/BurntSushi/ripgrep#installation"
)
# Exclude hidden directories (matching ripgrep's default behavior).
hidden_exclude = "-not -path '*/.*'" if not has_hidden_path_ancestor else ""
hidden_filter_expr = f" {hidden_exclude}" if hidden_exclude else ""
# Use shell pagination for standard roots. For hidden roots, gather full
# output so we can re-apply hidden-descendant filtering while allowing
# explicit hidden-root searches.
pagination_expr = ""
if not has_hidden_path_ancestor:
pagination_expr = f" | tail -n +{offset + 1} | head -n {limit}"
cmd = f"find {self._escape_shell_arg(path)}{hidden_filter_expr} -type f -name {self._escape_shell_arg(search_pattern)} " \
f"-printf '%T@ %p\\n' 2>/dev/null | sort -rn{pagination_expr}"
result = self._exec(cmd, timeout=60)
if not result.stdout.strip():
# Try without -printf (BSD find compatibility -- macOS)
cmd_simple = f"find {self._escape_shell_arg(path)}{hidden_filter_expr} -type f -name {self._escape_shell_arg(search_pattern)} " \
f"2>/dev/null | sort -rn{pagination_expr}"
result = self._exec(cmd_simple, timeout=60)
files = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(' ', 1)
if len(parts) == 2 and parts[0].replace('.', '').isdigit():
files.append(parts[1])
else:
files.append(line)
# For explicit hidden roots, find's path-based filtering excludes every
# file under the hidden path. Apply descendant filtering after command
# execution so only the explicit root ancestry is bypassed.
if has_hidden_path_ancestor:
normalized_root = search_root.resolve()
filtered_files = []
for file_path in files:
try:
rel_parts = Path(file_path).resolve().relative_to(normalized_root).parts
except ValueError:
rel_parts = Path(file_path).parts
if any(part not in (".", "..") and part.startswith(".") for part in rel_parts):
continue
filtered_files.append(file_path)
files = filtered_files[offset:offset + limit]
# pagination for standard roots is already applied in shell
return SearchResult(
files=files,
total_count=len(files)
)
def _search_files_rg(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
"""Search for files by name using ripgrep's --files mode.
rg --files respects .gitignore and excludes hidden directories by
default, and uses parallel directory traversal for ~200x speedup
over find on wide trees. Results are sorted by modification time
(most recently edited first) when rg >= 13.0 supports --sortr.
"""
# rg --files -g uses glob patterns; wrap bare names so they match
# at any depth (equivalent to find -name).
if '/' not in pattern and not pattern.startswith('*'):
glob_pattern = f"*{pattern}"
else:
glob_pattern = pattern
fetch_limit = limit + offset
# Try mtime-sorted first (rg 13+); fall back to unsorted if not supported.
cmd_sorted = (
f"rg --files --sortr=modified -g {self._escape_shell_arg(glob_pattern)} "
f"{self._escape_shell_arg(path)} 2>/dev/null "
f"| head -n {fetch_limit}"
)
result = self._exec(cmd_sorted, timeout=60)
all_files = [f for f in result.stdout.strip().split('\n') if f]
if not all_files:
# --sortr may have failed on older rg; retry without it.
cmd_plain = (
f"rg --files -g {self._escape_shell_arg(glob_pattern)} "
f"{self._escape_shell_arg(path)} 2>/dev/null "
f"| head -n {fetch_limit}"
)
result = self._exec(cmd_plain, timeout=60)
all_files = [f for f in result.stdout.strip().split('\n') if f]
page = all_files[offset:offset + limit]
return SearchResult(
files=page,
total_count=len(all_files),
truncated=len(all_files) >= fetch_limit,
)
def _search_content(self, pattern: str, path: str, file_glob: Optional[str],
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
"""Search for content inside files (grep-like)."""
# Try ripgrep first (fast), fallback to grep (slower but works)
if self._has_command('rg'):
return self._search_with_rg(pattern, path, file_glob, limit, offset,
output_mode, context)
elif self._has_command('grep'):
return self._search_with_grep(pattern, path, file_glob, limit, offset,
output_mode, context)
else:
# Neither rg nor grep available (Windows without Git Bash, etc.)
return SearchResult(
error="Content search requires ripgrep (rg) or grep. "
"Install ripgrep: https://github.com/BurntSushi/ripgrep#installation"
)
def _search_with_rg(self, pattern: str, path: str, file_glob: Optional[str],
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
"""Search using ripgrep."""
cmd_parts = ["rg", "--line-number", "--no-heading", "--with-filename"]
# Add context if requested
if context > 0:
cmd_parts.extend(["-C", str(context)])
# Add file glob filter (must be quoted to prevent shell expansion)
if file_glob:
cmd_parts.extend(["--glob", self._escape_shell_arg(file_glob)])
# Output mode handling
if output_mode == "files_only":
cmd_parts.append("-l") # Files only
elif output_mode == "count":
cmd_parts.append("-c") # Count per file
# Add pattern and path
cmd_parts.append(self._escape_shell_arg(pattern))
cmd_parts.append(self._escape_shell_arg(path))
# Fetch extra rows so we can report the true total before slicing.
# For context mode, rg emits separator lines ("--") between groups,
# so we grab generously and filter in Python.
fetch_limit = limit + offset + 200 if context > 0 else limit + offset
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
cmd = " ".join(cmd_parts)
result = self._exec(cmd, timeout=60)
# rg exit codes: 0=matches found, 1=no matches, 2=error
if result.exit_code == 2 and not result.stdout.strip():
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
# Parse results based on output mode
if output_mode == "files_only":
all_files = [f for f in result.stdout.strip().split('\n') if f]
total = len(all_files)
page = all_files[offset:offset + limit]
return SearchResult(files=page, total_count=total)
elif output_mode == "count":
counts = {}
for line in result.stdout.strip().split('\n'):
if ':' in line:
parts = line.rsplit(':', 1)
if len(parts) == 2:
try:
counts[parts[0]] = int(parts[1])
except ValueError:
pass
return SearchResult(counts=counts, total_count=sum(counts.values()))
else:
# Parse content matches and context lines.
# rg match lines: "file:lineno:content" (colon separator)
# rg context lines: "file-lineno-content" (dash separator)
# rg group seps: "--"
# Note: on Windows, paths contain drive letters (e.g. C:\path),
# so naive split(":") breaks. Use regex to handle both platforms.
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
matches = []
for line in result.stdout.strip().split('\n'):
if not line or line == "--":
continue
# Try match line first (colon-separated: file:line:content)
m = _match_re.match(line)
if m:
matches.append(SearchMatch(
path=(m.group(1) or '') + m.group(2),
line_number=int(m.group(3)),
content=m.group(4)[:500]
))
continue
# Try context line (dash-separated: file-line-content)
# Only attempt if context was requested to avoid false positives
if context > 0:
parsed = _parse_search_context_line(line)
if parsed:
matches.append(SearchMatch(
path=parsed[0],
line_number=parsed[1],
content=parsed[2][:500]
))
total = len(matches)
page = matches[offset:offset + limit]
return SearchResult(
matches=page,
total_count=total,
truncated=total > offset + limit
)
def _search_with_grep(self, pattern: str, path: str, file_glob: Optional[str],
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
"""Fallback search using grep."""
cmd_parts = ["grep", "-rnH"] # -H forces filename even for single-file searches
# Exclude hidden directories (matching ripgrep's default behavior).
# This prevents searching inside .hub/index-cache/, .git/, etc.
cmd_parts.append("--exclude-dir='.*'")
# Add context if requested
if context > 0:
cmd_parts.extend(["-C", str(context)])
# Add file pattern filter (must be quoted to prevent shell expansion)
if file_glob:
cmd_parts.extend(["--include", self._escape_shell_arg(file_glob)])
# Output mode handling
if output_mode == "files_only":
cmd_parts.append("-l")
elif output_mode == "count":
cmd_parts.append("-c")
# Add pattern and path
cmd_parts.append(self._escape_shell_arg(pattern))
cmd_parts.append(self._escape_shell_arg(path))
# Fetch generously so we can compute total before slicing
fetch_limit = limit + offset + (200 if context > 0 else 0)
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
cmd = " ".join(cmd_parts)
result = self._exec(cmd, timeout=60)
# grep exit codes: 0=matches found, 1=no matches, 2=error
if result.exit_code == 2 and not result.stdout.strip():
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
if output_mode == "files_only":
all_files = [f for f in result.stdout.strip().split('\n') if f]
total = len(all_files)
page = all_files[offset:offset + limit]
return SearchResult(files=page, total_count=total)
elif output_mode == "count":
counts = {}
for line in result.stdout.strip().split('\n'):
if ':' in line:
parts = line.rsplit(':', 1)
if len(parts) == 2:
try:
counts[parts[0]] = int(parts[1])
except ValueError:
pass
return SearchResult(counts=counts, total_count=sum(counts.values()))
else:
# grep match lines: "file:lineno:content" (colon)
# grep context lines: "file-lineno-content" (dash)
# grep group seps: "--"
# Note: on Windows, paths contain drive letters (e.g. C:\path),
# so naive split(":") breaks. Use regex to handle both platforms.
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
matches = []
for line in result.stdout.strip().split('\n'):
if not line or line == "--":
continue
m = _match_re.match(line)
if m:
matches.append(SearchMatch(
path=(m.group(1) or '') + m.group(2),
line_number=int(m.group(3)),
content=m.group(4)[:500]
))
continue
if context > 0:
parsed = _parse_search_context_line(line)
if parsed:
matches.append(SearchMatch(
path=parsed[0],
line_number=parsed[1],
content=parsed[2][:500]
))
total = len(matches)
page = matches[offset:offset + limit]
return SearchResult(
matches=page,
total_count=total,
truncated=total > offset + limit
)