Files
hermes-agent/tests/tools/test_file_read_guards.py
Teknium a32d07529c fix(file-tools): escalate to BLOCKED on repeated read_file dedup stubs (#16382)
read_file's dedup path returned a lightweight stub on re-reads of an
unchanged file, then returned early — so the consecutive-read loop
guard (hard block at count>=4) at the bottom of read_file_tool never
ran for stub-looped calls. Weaker tool-following models (local Qwen3.6
variants in the reported case) ignore the passive 'refer to earlier
result' hint and hammer the same read_file call until iteration budget
runs out.

Track per-key stub returns in task_data['dedup_hits'] and, on the
second stub for the same (path, offset, limit), return a hard BLOCKED
error mirroring the wording the real-read path already uses. A real
read, an intervening non-read tool call (notify_other_tool_call), or
reset_file_dedup (on context compression) all clear the counter so
the guard never stays engaged longer than the actual loop.

Closes #15759
2026-04-27 00:17:26 -07:00

775 lines
31 KiB
Python

#!/usr/bin/env python3
"""
Tests for read_file_tool safety guards: device-path blocking,
character-count limits, file deduplication, and dedup reset on
context compression.
Run with: python -m pytest tests/tools/test_file_read_guards.py -v
"""
import json
import os
import tempfile
import time
import unittest
from unittest.mock import patch, MagicMock
from tools.file_tools import (
read_file_tool,
write_file_tool,
reset_file_dedup,
_is_blocked_device,
_invalidate_dedup_for_path,
_READ_DEDUP_STATUS_MESSAGE,
_get_max_read_chars,
_DEFAULT_MAX_READ_CHARS,
_read_tracker,
notify_other_tool_call,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
class _FakeReadResult:
"""Minimal stand-in for FileOperations.read_file return value."""
def __init__(self, content="line1\nline2\n", total_lines=2, file_size=100):
self.content = content
self._total_lines = total_lines
self._file_size = file_size
def to_dict(self):
return {
"content": self.content,
"total_lines": self._total_lines,
"file_size": self._file_size,
}
def _make_fake_ops(content="hello\n", total_lines=1, file_size=6):
fake = MagicMock()
fake.read_file = lambda path, offset=1, limit=500: _FakeReadResult(
content=content, total_lines=total_lines, file_size=file_size,
)
return fake
# ---------------------------------------------------------------------------
# Device path blocking
# ---------------------------------------------------------------------------
class TestDevicePathBlocking(unittest.TestCase):
"""Paths like /dev/zero should be rejected before any I/O."""
def test_blocked_device_detection(self):
for dev in ("/dev/zero", "/dev/random", "/dev/urandom", "/dev/stdin",
"/dev/tty", "/dev/console", "/dev/stdout", "/dev/stderr",
"/dev/fd/0", "/dev/fd/1", "/dev/fd/2"):
self.assertTrue(_is_blocked_device(dev), f"{dev} should be blocked")
def test_safe_device_not_blocked(self):
self.assertFalse(_is_blocked_device("/dev/null"))
self.assertFalse(_is_blocked_device("/dev/sda1"))
def test_proc_fd_blocked(self):
self.assertTrue(_is_blocked_device("/proc/self/fd/0"))
self.assertTrue(_is_blocked_device("/proc/12345/fd/2"))
def test_proc_fd_other_not_blocked(self):
self.assertFalse(_is_blocked_device("/proc/self/fd/3"))
self.assertFalse(_is_blocked_device("/proc/self/maps"))
def test_normal_files_not_blocked(self):
self.assertFalse(_is_blocked_device("/tmp/test.py"))
self.assertFalse(_is_blocked_device("/home/user/.bashrc"))
def test_read_file_tool_rejects_device(self):
"""read_file_tool returns an error without any file I/O."""
result = json.loads(read_file_tool("/dev/zero", task_id="dev_test"))
self.assertIn("error", result)
self.assertIn("device file", result["error"])
# ---------------------------------------------------------------------------
# Character-count limits
# ---------------------------------------------------------------------------
class TestCharacterCountGuard(unittest.TestCase):
"""Large reads should be rejected with guidance to use offset/limit."""
def setUp(self):
_read_tracker.clear()
def tearDown(self):
_read_tracker.clear()
@patch("tools.file_tools._get_file_ops")
@patch("tools.file_tools._get_max_read_chars", return_value=_DEFAULT_MAX_READ_CHARS)
def test_oversized_read_rejected(self, _mock_limit, mock_ops):
"""A read that returns >max chars is rejected."""
big_content = "x" * (_DEFAULT_MAX_READ_CHARS + 1)
mock_ops.return_value = _make_fake_ops(
content=big_content,
total_lines=5000,
file_size=len(big_content) + 100, # bigger than content
)
result = json.loads(read_file_tool("/tmp/huge.txt", task_id="big"))
self.assertIn("error", result)
self.assertIn("safety limit", result["error"])
self.assertIn("offset and limit", result["error"])
self.assertIn("total_lines", result)
@patch("tools.file_tools._get_file_ops")
def test_small_read_not_rejected(self, mock_ops):
"""Normal-sized reads pass through fine."""
mock_ops.return_value = _make_fake_ops(content="short\n", file_size=6)
result = json.loads(read_file_tool("/tmp/small.txt", task_id="small"))
self.assertNotIn("error", result)
self.assertIn("content", result)
@patch("tools.file_tools._get_file_ops")
@patch("tools.file_tools._get_max_read_chars", return_value=_DEFAULT_MAX_READ_CHARS)
def test_content_under_limit_passes(self, _mock_limit, mock_ops):
"""Content just under the limit should pass through fine."""
mock_ops.return_value = _make_fake_ops(
content="y" * (_DEFAULT_MAX_READ_CHARS - 1),
file_size=_DEFAULT_MAX_READ_CHARS - 1,
)
result = json.loads(read_file_tool("/tmp/justunder.txt", task_id="under"))
self.assertNotIn("error", result)
self.assertIn("content", result)
# ---------------------------------------------------------------------------
# File deduplication
# ---------------------------------------------------------------------------
class TestFileDedup(unittest.TestCase):
"""Re-reading an unchanged file should return a lightweight stub."""
def setUp(self):
_read_tracker.clear()
self._tmpdir = tempfile.mkdtemp()
self._tmpfile = os.path.join(self._tmpdir, "dedup_test.txt")
with open(self._tmpfile, "w") as f:
f.write("line one\nline two\n")
def tearDown(self):
_read_tracker.clear()
try:
os.unlink(self._tmpfile)
os.rmdir(self._tmpdir)
except OSError:
pass
@patch("tools.file_tools._get_file_ops")
def test_second_read_returns_dedup_stub(self, mock_ops):
"""Second read of same file+range returns non-content dedup status."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
# First read — full content
r1 = json.loads(read_file_tool(self._tmpfile, task_id="dup"))
self.assertNotIn("dedup", r1)
# Second read — should get dedup stub
r2 = json.loads(read_file_tool(self._tmpfile, task_id="dup"))
self.assertTrue(r2.get("dedup"), "Second read should return dedup stub")
self.assertEqual(r2.get("status"), "unchanged")
self.assertIn("unchanged", r2.get("message", ""))
self.assertFalse(r2.get("content_returned"))
self.assertNotIn("content", r2)
@patch("tools.file_tools._get_file_ops")
def test_write_rejects_internal_read_status_text(self, mock_ops):
"""write_file must not persist internal read_file status text."""
fake = MagicMock()
fake.write_file = MagicMock()
mock_ops.return_value = fake
result = json.loads(write_file_tool(
self._tmpfile,
_READ_DEDUP_STATUS_MESSAGE,
task_id="guard",
))
self.assertIn("error", result)
self.assertIn("internal read_file status text", result["error"])
fake.write_file.assert_not_called()
@patch("tools.file_tools._get_file_ops")
def test_write_rejects_status_text_with_small_framing(self, mock_ops):
"""write_file rejects small wrappers around the status text too.
Real-world corruption shapes aren't always the verbatim message — the
model sometimes prepends a short note or appends a trailing comment
before calling write_file. A short, status-dominated write is still
corruption, not legitimate file content.
"""
fake = MagicMock()
fake.write_file = MagicMock()
mock_ops.return_value = fake
wrapped = "Note: " + _READ_DEDUP_STATUS_MESSAGE + "\n\n(continuing.)"
result = json.loads(write_file_tool(
self._tmpfile,
wrapped,
task_id="guard",
))
self.assertIn("error", result)
self.assertIn("internal read_file status text", result["error"])
fake.write_file.assert_not_called()
@patch("tools.file_tools._get_file_ops")
def test_write_allows_large_file_that_quotes_status_text(self, mock_ops):
"""Legitimate large content that happens to quote the status is allowed.
Hermes' own docs / SKILL.md files may legitimately mention the dedup
message verbatim. Only short, status-dominated writes are rejected —
a normal file that contains the message as one line out of many must
still write successfully.
"""
fake = MagicMock()
fake.write_file = lambda path, content: MagicMock(
to_dict=lambda: {"success": True, "path": path}
)
mock_ops.return_value = fake
# Build content that contains the status text but is much larger,
# so the status doesn't "dominate" — this is a legitimate file.
large_content = (
"# Skill reference\n\n"
"Example internal message (do not write back):\n\n"
f" {_READ_DEDUP_STATUS_MESSAGE}\n\n"
+ ("This is documentation content. " * 200)
)
result = json.loads(write_file_tool(
self._tmpfile,
large_content,
task_id="guard",
))
self.assertNotIn("error", result)
self.assertTrue(result.get("success"))
@patch("tools.file_tools._get_file_ops")
def test_modified_file_not_deduped(self, mock_ops):
"""After the file is modified, dedup returns full content."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
read_file_tool(self._tmpfile, task_id="mod")
# Modify the file — ensure mtime changes
time.sleep(0.05)
with open(self._tmpfile, "w") as f:
f.write("changed content\n")
r2 = json.loads(read_file_tool(self._tmpfile, task_id="mod"))
self.assertNotEqual(r2.get("dedup"), True, "Modified file should not dedup")
@patch("tools.file_tools._get_file_ops")
def test_different_range_not_deduped(self, mock_ops):
"""Same file but different offset/limit should not dedup."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
read_file_tool(self._tmpfile, offset=1, limit=500, task_id="rng")
r2 = json.loads(read_file_tool(
self._tmpfile, offset=10, limit=500, task_id="rng",
))
self.assertNotEqual(r2.get("dedup"), True)
@patch("tools.file_tools._get_file_ops")
def test_different_task_not_deduped(self, mock_ops):
"""Different task_ids have separate dedup caches."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
read_file_tool(self._tmpfile, task_id="task_a")
r2 = json.loads(read_file_tool(self._tmpfile, task_id="task_b"))
self.assertNotEqual(r2.get("dedup"), True)
# ---------------------------------------------------------------------------
# Dedup stub-loop guard (issue #15759)
# ---------------------------------------------------------------------------
class TestDedupStubLoopGuard(unittest.TestCase):
"""Repeated dedup stubs must escalate to a hard BLOCKED error so weak
tool-following models don't burn iteration budget in an infinite loop
of ``read_file → stub → read_file → stub → ...``"""
def setUp(self):
_read_tracker.clear()
self._tmpdir = tempfile.mkdtemp()
self._tmpfile = os.path.join(self._tmpdir, "loop_test.txt")
with open(self._tmpfile, "w") as f:
f.write("line one\nline two\n")
def tearDown(self):
_read_tracker.clear()
try:
os.unlink(self._tmpfile)
os.rmdir(self._tmpdir)
except OSError:
pass
@patch("tools.file_tools._get_file_ops")
def test_third_read_is_blocked(self, mock_ops):
"""read → stub → BLOCKED. Second stub escalates to hard error."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
# 1. Real read — full content
r1 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
self.assertNotIn("dedup", r1)
self.assertNotIn("error", r1)
# 2. Dedup stub (first hit)
r2 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
self.assertTrue(r2.get("dedup"))
self.assertNotIn("error", r2)
# 3. Dedup stub (second hit) — escalates to BLOCKED
r3 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
self.assertIn("error", r3, "Second dedup stub should be BLOCKED")
self.assertIn("BLOCKED", r3["error"])
self.assertIn("STOP", r3["error"])
self.assertEqual(r3.get("already_read"), 3)
# The loop-breaker must NOT be a dedup stub, or the model sees the
# same passive message it has been ignoring.
self.assertNotIn("dedup", r3)
@patch("tools.file_tools._get_file_ops")
def test_subsequent_reads_stay_blocked(self, mock_ops):
"""Once blocked, continued hammering keeps returning BLOCKED."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
read_file_tool(self._tmpfile, task_id="loop") # read
read_file_tool(self._tmpfile, task_id="loop") # stub
r3 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
self.assertIn("error", r3)
# 4th, 5th, ... calls must stay blocked, never revert to stub
for _ in range(5):
rN = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
self.assertIn("error", rN)
self.assertIn("BLOCKED", rN["error"])
@patch("tools.file_tools._get_file_ops")
def test_file_modification_clears_block(self, mock_ops):
"""Real file change should break out of the block — new content
is legitimately different and the agent should see it."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
read_file_tool(self._tmpfile, task_id="loop")
read_file_tool(self._tmpfile, task_id="loop")
r3 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
self.assertIn("error", r3)
# File changes — mtime updates
time.sleep(0.05)
with open(self._tmpfile, "w") as f:
f.write("brand new content\n")
r4 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
self.assertNotIn("error", r4)
self.assertNotIn("dedup", r4)
@patch("tools.file_tools._get_file_ops")
def test_other_tool_call_clears_hits(self, mock_ops):
"""An intervening non-read tool call resets stub-hit counters,
just like it resets the consecutive-read counter."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
read_file_tool(self._tmpfile, task_id="loop")
read_file_tool(self._tmpfile, task_id="loop") # 1st stub
# Agent did something else — e.g. terminal, write_file — so the
# stub-loop is broken. Counter should reset.
notify_other_tool_call("loop")
r3 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
# Should be a stub again, NOT blocked
self.assertTrue(r3.get("dedup"))
self.assertNotIn("error", r3)
@patch("tools.file_tools._get_file_ops")
def test_different_ranges_tracked_independently(self, mock_ops):
"""Stub-hit counter is keyed by (path, offset, limit), so hammering
one range shouldn't block reads of a different range."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
# Burn down one range
read_file_tool(self._tmpfile, offset=1, limit=100, task_id="loop")
read_file_tool(self._tmpfile, offset=1, limit=100, task_id="loop")
r3 = json.loads(read_file_tool(
self._tmpfile, offset=1, limit=100, task_id="loop",
))
self.assertIn("error", r3)
# Different range — fresh read, should go through
r_other = json.loads(read_file_tool(
self._tmpfile, offset=1, limit=200, task_id="loop",
))
self.assertNotIn("error", r_other)
@patch("tools.file_tools._get_file_ops")
def test_reset_file_dedup_clears_hits(self, mock_ops):
"""Post-compression reset must clear stub-hit counters too,
otherwise the agent stays blocked after compression."""
mock_ops.return_value = _make_fake_ops(
content="line one\nline two\n", file_size=20,
)
read_file_tool(self._tmpfile, task_id="loop")
read_file_tool(self._tmpfile, task_id="loop")
r3 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
self.assertIn("error", r3)
reset_file_dedup("loop")
# Fresh session — real read, no stub, no block
r4 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
self.assertNotIn("error", r4)
self.assertNotIn("dedup", r4)
# ---------------------------------------------------------------------------
# Dedup reset on compression
# ---------------------------------------------------------------------------
class TestDedupResetOnCompression(unittest.TestCase):
"""reset_file_dedup should clear the dedup cache so post-compression
reads return full content."""
def setUp(self):
_read_tracker.clear()
self._tmpdir = tempfile.mkdtemp()
self._tmpfile = os.path.join(self._tmpdir, "compress_test.txt")
with open(self._tmpfile, "w") as f:
f.write("original content\n")
def tearDown(self):
_read_tracker.clear()
try:
os.unlink(self._tmpfile)
os.rmdir(self._tmpdir)
except OSError:
pass
@patch("tools.file_tools._get_file_ops")
def test_reset_clears_dedup(self, mock_ops):
"""After reset_file_dedup, the same read returns full content."""
mock_ops.return_value = _make_fake_ops(
content="original content\n", file_size=18,
)
# First read — populates dedup cache
read_file_tool(self._tmpfile, task_id="comp")
# Verify dedup works before reset
r_dedup = json.loads(read_file_tool(self._tmpfile, task_id="comp"))
self.assertTrue(r_dedup.get("dedup"), "Should dedup before reset")
# Simulate compression
reset_file_dedup("comp")
# Read again — should get full content
r_post = json.loads(read_file_tool(self._tmpfile, task_id="comp"))
self.assertNotEqual(r_post.get("dedup"), True,
"Post-compression read should return full content")
@patch("tools.file_tools._get_file_ops")
def test_reset_all_tasks(self, mock_ops):
"""reset_file_dedup(None) clears all tasks."""
mock_ops.return_value = _make_fake_ops(
content="original content\n", file_size=18,
)
read_file_tool(self._tmpfile, task_id="t1")
read_file_tool(self._tmpfile, task_id="t2")
reset_file_dedup() # no task_id — clear all
r1 = json.loads(read_file_tool(self._tmpfile, task_id="t1"))
r2 = json.loads(read_file_tool(self._tmpfile, task_id="t2"))
self.assertNotEqual(r1.get("dedup"), True)
self.assertNotEqual(r2.get("dedup"), True)
@patch("tools.file_tools._get_file_ops")
def test_reset_preserves_loop_detection(self, mock_ops):
"""reset_file_dedup does NOT affect the consecutive-read counter."""
mock_ops.return_value = _make_fake_ops(
content="original content\n", file_size=18,
)
# Build up consecutive count (read 1 and 2)
read_file_tool(self._tmpfile, task_id="loop")
# 2nd read is deduped — doesn't increment consecutive counter
read_file_tool(self._tmpfile, task_id="loop")
reset_file_dedup("loop")
# 3rd read — counter should still be at 2 from before reset
# (dedup was hit for read 2, but consecutive counter was 1 for that)
# After reset, this read goes through full path, incrementing to 2
r3 = json.loads(read_file_tool(self._tmpfile, task_id="loop"))
# Should NOT be blocked or warned — counter restarted since dedup
# intercepted reads before they reached the counter
self.assertNotIn("error", r3)
# ---------------------------------------------------------------------------
# Large-file hint
# ---------------------------------------------------------------------------
class TestLargeFileHint(unittest.TestCase):
"""Large truncated files should include a hint about targeted reads."""
def setUp(self):
_read_tracker.clear()
def tearDown(self):
_read_tracker.clear()
@patch("tools.file_tools._get_file_ops")
def test_large_truncated_file_gets_hint(self, mock_ops):
content = "line\n" * 400 # 2000 chars, small enough to pass char guard
fake = _make_fake_ops(content=content, total_lines=10000, file_size=600_000)
# Make to_dict return truncated=True
orig_read = fake.read_file
def patched_read(path, offset=1, limit=500):
r = orig_read(path, offset, limit)
orig_to_dict = r.to_dict
def new_to_dict():
d = orig_to_dict()
d["truncated"] = True
return d
r.to_dict = new_to_dict
return r
fake.read_file = patched_read
mock_ops.return_value = fake
result = json.loads(read_file_tool("/tmp/bigfile.log", task_id="hint"))
self.assertIn("_hint", result)
self.assertIn("section you need", result["_hint"])
# ---------------------------------------------------------------------------
# Config override
# ---------------------------------------------------------------------------
class TestConfigOverride(unittest.TestCase):
"""file_read_max_chars in config.yaml should control the char guard."""
def setUp(self):
_read_tracker.clear()
# Reset the cached value so each test gets a fresh lookup
import tools.file_tools as _ft
_ft._max_read_chars_cached = None
def tearDown(self):
_read_tracker.clear()
import tools.file_tools as _ft
_ft._max_read_chars_cached = None
@patch("tools.file_tools._get_file_ops")
@patch("hermes_cli.config.load_config", return_value={"file_read_max_chars": 50})
def test_custom_config_lowers_limit(self, _mock_cfg, mock_ops):
"""A config value of 50 should reject reads over 50 chars."""
mock_ops.return_value = _make_fake_ops(content="x" * 60, file_size=60)
result = json.loads(read_file_tool("/tmp/cfgtest.txt", task_id="cfg1"))
self.assertIn("error", result)
self.assertIn("safety limit", result["error"])
self.assertIn("50", result["error"]) # should show the configured limit
@patch("tools.file_tools._get_file_ops")
@patch("hermes_cli.config.load_config", return_value={"file_read_max_chars": 500_000})
def test_custom_config_raises_limit(self, _mock_cfg, mock_ops):
"""A config value of 500K should allow reads up to 500K chars."""
# 200K chars would be rejected at the default 100K but passes at 500K
mock_ops.return_value = _make_fake_ops(
content="y" * 200_000, file_size=200_000,
)
result = json.loads(read_file_tool("/tmp/cfgtest2.txt", task_id="cfg2"))
self.assertNotIn("error", result)
self.assertIn("content", result)
# ---------------------------------------------------------------------------
# Write invalidates dedup cache (fixes #13144)
# ---------------------------------------------------------------------------
class TestWriteInvalidatesDedup(unittest.TestCase):
"""write_file_tool and patch_tool must invalidate the read_file dedup
cache for the written path. Without this, a read→write→read sequence
within the same mtime second returns a stale 'File unchanged' stub.
Regression test for https://github.com/NousResearch/hermes-agent/issues/13144
"""
def setUp(self):
_read_tracker.clear()
self._tmpdir = tempfile.mkdtemp()
self._tmpfile = os.path.join(self._tmpdir, "write_dedup.txt")
with open(self._tmpfile, "w") as f:
f.write("original content\n")
def tearDown(self):
_read_tracker.clear()
try:
os.unlink(self._tmpfile)
os.rmdir(self._tmpdir)
except OSError:
pass
@patch("tools.file_tools._get_file_ops")
def test_write_invalidates_dedup_same_second(self, mock_ops):
"""read→write→read within the same mtime second returns fresh content.
This is the core #13144 scenario: on filesystems with ≥1ms mtime
granularity, a write that lands in the same timestamp as the prior
read would previously cause the second read to return a stale dedup
stub because the mtime comparison saw no change.
"""
fake = MagicMock()
fake.read_file = lambda path, offset=1, limit=500: _FakeReadResult(
content="original content\n", total_lines=1, file_size=18,
)
fake.write_file = lambda path, content: MagicMock(
to_dict=lambda: {"success": True, "path": path}
)
mock_ops.return_value = fake
# 1. Read — populates dedup cache.
r1 = json.loads(read_file_tool(self._tmpfile, task_id="wr"))
self.assertNotEqual(r1.get("dedup"), True)
# 2. Write — must invalidate dedup for this path.
# (No sleep — we intentionally stay in the same mtime second.)
write_file_tool(self._tmpfile, "new content\n", task_id="wr")
# 3. Read again — should get full content, NOT dedup stub.
fake.read_file = lambda path, offset=1, limit=500: _FakeReadResult(
content="new content\n", total_lines=1, file_size=13,
)
r2 = json.loads(read_file_tool(self._tmpfile, task_id="wr"))
self.assertNotEqual(r2.get("dedup"), True,
"read after write must not return dedup stub")
self.assertIn("content", r2)
@patch("tools.file_tools._get_file_ops")
def test_write_invalidates_all_offsets(self, mock_ops):
"""A write invalidates dedup entries for ALL offset/limit combos."""
fake = MagicMock()
fake.read_file = lambda path, offset=1, limit=500: _FakeReadResult(
content="line1\nline2\nline3\n", total_lines=3, file_size=20,
)
fake.write_file = lambda path, content: MagicMock(
to_dict=lambda: {"success": True, "path": path}
)
mock_ops.return_value = fake
# Read with different offsets to populate multiple dedup entries.
read_file_tool(self._tmpfile, offset=1, limit=100, task_id="off")
read_file_tool(self._tmpfile, offset=50, limit=100, task_id="off")
# Write — should invalidate BOTH dedup entries.
write_file_tool(self._tmpfile, "replaced\n", task_id="off")
# Both reads should return fresh content.
r1 = json.loads(read_file_tool(self._tmpfile, offset=1, limit=100, task_id="off"))
r2 = json.loads(read_file_tool(self._tmpfile, offset=50, limit=100, task_id="off"))
self.assertNotEqual(r1.get("dedup"), True,
"offset=1 should not dedup after write")
self.assertNotEqual(r2.get("dedup"), True,
"offset=50 should not dedup after write")
@patch("tools.file_tools._get_file_ops")
def test_write_does_not_invalidate_other_files(self, mock_ops):
"""Writing file A should not invalidate dedup for file B."""
other = os.path.join(self._tmpdir, "other.txt")
with open(other, "w") as f:
f.write("other content\n")
fake = MagicMock()
fake.read_file = lambda path, offset=1, limit=500: _FakeReadResult(
content="other content\n", total_lines=1, file_size=15,
)
fake.write_file = lambda path, content: MagicMock(
to_dict=lambda: {"success": True, "path": path}
)
mock_ops.return_value = fake
# Read file B.
read_file_tool(other, task_id="iso")
# Write file A.
write_file_tool(self._tmpfile, "changed A\n", task_id="iso")
# File B should still dedup (untouched).
r2 = json.loads(read_file_tool(other, task_id="iso"))
self.assertTrue(r2.get("dedup"),
"Unrelated file should still dedup after writing another file")
try:
os.unlink(other)
except OSError:
pass
@patch("tools.file_tools._get_file_ops")
def test_write_does_not_invalidate_other_tasks(self, mock_ops):
"""Writing in task A should not invalidate dedup for task B."""
fake = MagicMock()
fake.read_file = lambda path, offset=1, limit=500: _FakeReadResult(
content="original content\n", total_lines=1, file_size=18,
)
fake.write_file = lambda path, content: MagicMock(
to_dict=lambda: {"success": True, "path": path}
)
mock_ops.return_value = fake
# Both tasks read the file.
read_file_tool(self._tmpfile, task_id="taskA")
read_file_tool(self._tmpfile, task_id="taskB")
# Task A writes.
write_file_tool(self._tmpfile, "new\n", task_id="taskA")
# Task A's dedup should be invalidated.
rA = json.loads(read_file_tool(self._tmpfile, task_id="taskA"))
self.assertNotEqual(rA.get("dedup"), True,
"Writing task's dedup should be invalidated")
# Task B still sees dedup (its cache is separate — the file
# *may* have changed on disk, but mtime comparison handles that;
# here we test that invalidation is scoped to the writing task).
# Note: on real FS, task B's dedup might or might not hit depending
# on mtime. The point is that _invalidate_dedup_for_path is
# correctly scoped to task_id.
def test_invalidate_dedup_for_path_noop_on_missing_task(self):
"""_invalidate_dedup_for_path is safe when task_id doesn't exist."""
_read_tracker.clear()
# Should not raise.
_invalidate_dedup_for_path("/nonexistent/path", "no_such_task")
def test_invalidate_dedup_for_path_noop_on_empty_dedup(self):
"""_invalidate_dedup_for_path is safe when dedup dict is empty."""
_read_tracker.clear()
_read_tracker["t"] = {
"last_key": None, "consecutive": 0,
"read_history": set(), "dedup": {},
}
_invalidate_dedup_for_path("/some/path", "t")
self.assertEqual(_read_tracker["t"]["dedup"], {})
if __name__ == "__main__":
unittest.main()