fix: add upstream guard for non-dict function_args + tests for build_tool_preview

Complements PR #453 by 0xbyt4. Adds isinstance(dict) guard in
run_agent.py to catch cases where json.loads returns non-dict
(e.g. null, list, string) before they reach downstream code.

Also adds 15 tests for build_tool_preview covering None args,
empty dicts, known/unknown tools, fallback keys, truncation,
and all special-cased tools (process, todo, memory, session_search).
This commit is contained in:
teknium1
2026-03-09 21:01:40 -07:00
3 changed files with 132 additions and 7 deletions

View File

@@ -23,6 +23,7 @@ import stat
import base64
import hashlib
import subprocess
import threading
import time
import uuid
import webbrowser
@@ -44,6 +45,10 @@ try:
import fcntl
except Exception:
fcntl = None
try:
import msvcrt
except Exception:
msvcrt = None
# =============================================================================
# Constants
@@ -299,31 +304,64 @@ def _auth_lock_path() -> Path:
return _auth_file_path().with_suffix(".lock")
_auth_lock_holder = threading.local()
@contextmanager
def _auth_store_lock(timeout_seconds: float = AUTH_LOCK_TIMEOUT_SECONDS):
"""Cross-process advisory lock for auth.json reads+writes."""
"""Cross-process advisory lock for auth.json reads+writes. Reentrant."""
# Reentrant: if this thread already holds the lock, just yield.
if getattr(_auth_lock_holder, "depth", 0) > 0:
_auth_lock_holder.depth += 1
try:
yield
finally:
_auth_lock_holder.depth -= 1
return
lock_path = _auth_lock_path()
lock_path.parent.mkdir(parents=True, exist_ok=True)
with lock_path.open("a+") as lock_file:
if fcntl is None:
if fcntl is None and msvcrt is None:
_auth_lock_holder.depth = 1
try:
yield
finally:
_auth_lock_holder.depth = 0
return
# On Windows, msvcrt.locking needs the file to have content and the
# file pointer at position 0. Ensure the lock file has at least 1 byte.
if msvcrt and (not lock_path.exists() or lock_path.stat().st_size == 0):
lock_path.write_text(" ", encoding="utf-8")
with lock_path.open("r+" if msvcrt else "a+") as lock_file:
deadline = time.time() + max(1.0, timeout_seconds)
while True:
try:
if fcntl:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
else:
lock_file.seek(0)
msvcrt.locking(lock_file.fileno(), msvcrt.LK_NBLCK, 1)
break
except BlockingIOError:
except (BlockingIOError, OSError, PermissionError):
if time.time() >= deadline:
raise TimeoutError("Timed out waiting for auth store lock")
time.sleep(0.05)
_auth_lock_holder.depth = 1
try:
yield
finally:
_auth_lock_holder.depth = 0
if fcntl:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
elif msvcrt:
try:
lock_file.seek(0)
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
except (OSError, IOError):
pass
def _load_auth_store(auth_file: Optional[Path] = None) -> Dict[str, Any]:

View File

@@ -2689,6 +2689,8 @@ class AIAgent:
except json.JSONDecodeError as e:
logging.warning(f"Unexpected JSON error after validation: {e}")
function_args = {}
if not isinstance(function_args, dict):
function_args = {}
if not self.quiet_mode:
args_str = json.dumps(function_args, ensure_ascii=False)

85
tests/test_display.py Normal file
View File

@@ -0,0 +1,85 @@
"""Tests for agent/display.py — build_tool_preview()."""
import pytest
from agent.display import build_tool_preview
class TestBuildToolPreview:
"""Tests for build_tool_preview defensive handling and normal operation."""
def test_none_args_returns_none(self):
"""PR #453: None args should not crash, should return None."""
assert build_tool_preview("terminal", None) is None
def test_empty_dict_returns_none(self):
"""Empty dict has no keys to preview."""
assert build_tool_preview("terminal", {}) is None
def test_known_tool_with_primary_arg(self):
"""Known tool with its primary arg should return a preview string."""
result = build_tool_preview("terminal", {"command": "ls -la"})
assert result is not None
assert "ls -la" in result
def test_web_search_preview(self):
result = build_tool_preview("web_search", {"query": "hello world"})
assert result is not None
assert "hello world" in result
def test_read_file_preview(self):
result = build_tool_preview("read_file", {"path": "/tmp/test.py", "offset": 1})
assert result is not None
assert "/tmp/test.py" in result
def test_unknown_tool_with_fallback_key(self):
"""Unknown tool but with a recognized fallback key should still preview."""
result = build_tool_preview("custom_tool", {"query": "test query"})
assert result is not None
assert "test query" in result
def test_unknown_tool_no_matching_key(self):
"""Unknown tool with no recognized keys should return None."""
result = build_tool_preview("custom_tool", {"foo": "bar"})
assert result is None
def test_long_value_truncated(self):
"""Preview should truncate long values."""
long_cmd = "a" * 100
result = build_tool_preview("terminal", {"command": long_cmd}, max_len=40)
assert result is not None
assert len(result) <= 43 # max_len + "..."
def test_process_tool_with_none_args(self):
"""Process tool special case should also handle None args."""
assert build_tool_preview("process", None) is None
def test_process_tool_normal(self):
result = build_tool_preview("process", {"action": "poll", "session_id": "abc123"})
assert result is not None
assert "poll" in result
def test_todo_tool_read(self):
result = build_tool_preview("todo", {"merge": False})
assert result is not None
assert "reading" in result
def test_todo_tool_with_todos(self):
result = build_tool_preview("todo", {"todos": [{"id": "1", "content": "test", "status": "pending"}]})
assert result is not None
assert "1 task" in result
def test_memory_tool_add(self):
result = build_tool_preview("memory", {"action": "add", "target": "user", "content": "test note"})
assert result is not None
assert "user" in result
def test_session_search_preview(self):
result = build_tool_preview("session_search", {"query": "find something"})
assert result is not None
assert "find something" in result
def test_false_like_args_zero(self):
"""Non-dict falsy values should return None, not crash."""
assert build_tool_preview("terminal", 0) is None
assert build_tool_preview("terminal", "") is None
assert build_tool_preview("terminal", []) is None