mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 15:01:34 +08:00
Compare commits
1 Commits
codex-port
...
claude-cod
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
08b97660c5 |
@@ -267,13 +267,19 @@ class ContextCompressor:
|
||||
|
||||
return "\n\n".join(parts)
|
||||
|
||||
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> Optional[str]:
|
||||
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]], focus_topic: str = None) -> Optional[str]:
|
||||
"""Generate a structured summary of conversation turns.
|
||||
|
||||
Uses a structured template (Goal, Progress, Decisions, Files, Next Steps)
|
||||
inspired by Pi-mono and OpenCode. When a previous summary exists,
|
||||
generates an iterative update instead of summarizing from scratch.
|
||||
|
||||
Args:
|
||||
focus_topic: Optional focus string for guided compression. When
|
||||
provided, the summariser prioritises preserving information
|
||||
related to this topic and is more aggressive about compressing
|
||||
everything else. Inspired by Claude Code's ``/compact``.
|
||||
|
||||
Returns None if all attempts fail — the caller should drop
|
||||
the middle turns without a summary rather than inject a useless
|
||||
placeholder.
|
||||
@@ -375,6 +381,14 @@ Target ~{summary_budget} tokens. Be specific — include file paths, command out
|
||||
|
||||
Write only the summary body. Do not include any preamble or prefix."""
|
||||
|
||||
# Inject focus topic guidance when the user provides one via /compress <focus>.
|
||||
# This goes at the end of the prompt so it takes precedence.
|
||||
if focus_topic:
|
||||
prompt += f"""
|
||||
|
||||
FOCUS TOPIC: "{focus_topic}"
|
||||
The user has requested that this compaction PRIORITISE preserving all information related to the focus topic above. For content related to "{focus_topic}", include full detail — exact values, file paths, command outputs, error messages, and decisions. For content NOT related to the focus topic, summarise more aggressively (brief one-liners or omit if truly irrelevant). The focus topic sections should receive roughly 60-70% of the summary token budget."""
|
||||
|
||||
try:
|
||||
call_kwargs = {
|
||||
"task": "compression",
|
||||
@@ -592,7 +606,7 @@ Write only the summary body. Do not include any preamble or prefix."""
|
||||
# Main compression entry point
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
|
||||
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None, focus_topic: str = None) -> List[Dict[str, Any]]:
|
||||
"""Compress conversation messages by summarizing middle turns.
|
||||
|
||||
Algorithm:
|
||||
@@ -604,6 +618,12 @@ Write only the summary body. Do not include any preamble or prefix."""
|
||||
|
||||
After compression, orphaned tool_call / tool_result pairs are cleaned
|
||||
up so the API never receives mismatched IDs.
|
||||
|
||||
Args:
|
||||
focus_topic: Optional focus string for guided compression. When
|
||||
provided, the summariser will prioritise preserving information
|
||||
related to this topic and be more aggressive about compressing
|
||||
everything else. Inspired by Claude Code's ``/compact``.
|
||||
"""
|
||||
n_messages = len(messages)
|
||||
# Only need head + 3 tail messages minimum (token budget decides the real tail size)
|
||||
@@ -661,7 +681,7 @@ Write only the summary body. Do not include any preamble or prefix."""
|
||||
)
|
||||
|
||||
# Phase 3: Generate structured summary
|
||||
summary = self._generate_summary(turns_to_summarize)
|
||||
summary = self._generate_summary(turns_to_summarize, focus_topic=focus_topic)
|
||||
|
||||
# Phase 4: Assemble compressed message list
|
||||
compressed = []
|
||||
|
||||
220
cli.py
220
cli.py
@@ -4962,7 +4962,9 @@ class HermesCLI:
|
||||
elif canonical == "fast":
|
||||
self._handle_fast_command(cmd_original)
|
||||
elif canonical == "compress":
|
||||
self._manual_compress()
|
||||
self._manual_compress(cmd_original)
|
||||
elif canonical == "context":
|
||||
self._show_context_breakdown()
|
||||
elif canonical == "usage":
|
||||
self._show_usage()
|
||||
elif canonical == "insights":
|
||||
@@ -5818,8 +5820,14 @@ class HermesCLI:
|
||||
self._reasoning_preview_buf = getattr(self, "_reasoning_preview_buf", "") + reasoning_text
|
||||
self._flush_reasoning_preview(force=False)
|
||||
|
||||
def _manual_compress(self):
|
||||
"""Manually trigger context compression on the current conversation."""
|
||||
def _manual_compress(self, cmd_original: str = ""):
|
||||
"""Manually trigger context compression on the current conversation.
|
||||
|
||||
Accepts an optional focus topic: ``/compress <focus>`` guides the
|
||||
summariser to preserve information related to *focus* while being
|
||||
more aggressive about discarding everything else. Inspired by
|
||||
Claude Code's ``/compact <focus>`` feature.
|
||||
"""
|
||||
if not self.conversation_history or len(self.conversation_history) < 4:
|
||||
print("(._.) Not enough conversation to compress (need at least 4 messages).")
|
||||
return
|
||||
@@ -5832,16 +5840,28 @@ class HermesCLI:
|
||||
print("(._.) Compression is disabled in config.")
|
||||
return
|
||||
|
||||
# Extract optional focus topic from the command (e.g. "/compress database schema")
|
||||
focus_topic = ""
|
||||
if cmd_original:
|
||||
parts = cmd_original.strip().split(None, 1)
|
||||
if len(parts) > 1:
|
||||
focus_topic = parts[1].strip()
|
||||
|
||||
original_count = len(self.conversation_history)
|
||||
try:
|
||||
from agent.model_metadata import estimate_messages_tokens_rough
|
||||
approx_tokens = estimate_messages_tokens_rough(self.conversation_history)
|
||||
print(f"🗜️ Compressing {original_count} messages (~{approx_tokens:,} tokens)...")
|
||||
if focus_topic:
|
||||
print(f"🗜️ Compressing {original_count} messages (~{approx_tokens:,} tokens), "
|
||||
f"focus: \"{focus_topic}\"...")
|
||||
else:
|
||||
print(f"🗜️ Compressing {original_count} messages (~{approx_tokens:,} tokens)...")
|
||||
|
||||
compressed, _new_system = self.agent._compress_context(
|
||||
self.conversation_history,
|
||||
self.agent._cached_system_prompt or "",
|
||||
approx_tokens=approx_tokens,
|
||||
focus_topic=focus_topic or None,
|
||||
)
|
||||
self.conversation_history = compressed
|
||||
new_count = len(self.conversation_history)
|
||||
@@ -5854,6 +5874,198 @@ class HermesCLI:
|
||||
except Exception as e:
|
||||
print(f" ❌ Compression failed: {e}")
|
||||
|
||||
def _show_context_breakdown(self):
|
||||
"""Show a live breakdown of context window usage by component.
|
||||
|
||||
Inspired by Claude Code's ``/context`` command — gives users visibility
|
||||
into what is consuming their context window (system prompt, memory,
|
||||
skills, context files, conversation messages, tool results, etc.).
|
||||
"""
|
||||
if not self.agent:
|
||||
print("(._.) No active agent — send a message first.")
|
||||
return
|
||||
|
||||
from agent.model_metadata import (
|
||||
estimate_tokens_rough,
|
||||
estimate_messages_tokens_rough,
|
||||
)
|
||||
|
||||
agent = self.agent
|
||||
compressor = getattr(agent, "context_compressor", None)
|
||||
context_length = getattr(compressor, "context_length", 0) or 0
|
||||
if not context_length:
|
||||
from agent.model_metadata import get_model_context_length
|
||||
context_length = get_model_context_length(agent.model or "")
|
||||
|
||||
# ── System prompt breakdown ────────────────────────────────
|
||||
system_prompt = getattr(agent, "_cached_system_prompt", "") or ""
|
||||
system_total = estimate_tokens_rough(system_prompt)
|
||||
|
||||
# Attempt to break down the system prompt into its component layers.
|
||||
# The prompt is assembled by joining parts with "\n\n", so we can
|
||||
# identify known sections by their content signatures.
|
||||
components = []
|
||||
if system_prompt:
|
||||
from agent.prompt_builder import load_soul_md, DEFAULT_AGENT_IDENTITY
|
||||
# Identity block
|
||||
soul = load_soul_md()
|
||||
if soul and soul[:60] in system_prompt:
|
||||
identity_tokens = estimate_tokens_rough(soul)
|
||||
components.append((" Identity (SOUL.md)", identity_tokens))
|
||||
elif DEFAULT_AGENT_IDENTITY[:40] in system_prompt:
|
||||
identity_tokens = estimate_tokens_rough(DEFAULT_AGENT_IDENTITY)
|
||||
components.append((" Identity (built-in)", identity_tokens))
|
||||
|
||||
# Memory
|
||||
mem_store = getattr(agent, "_memory_store", None)
|
||||
if mem_store:
|
||||
mem_block = mem_store.format_for_system_prompt("memory")
|
||||
if mem_block and mem_block[:30] in system_prompt:
|
||||
components.append((" Memory", estimate_tokens_rough(mem_block)))
|
||||
user_block = mem_store.format_for_system_prompt("user")
|
||||
if user_block and user_block[:30] in system_prompt:
|
||||
components.append((" User profile", estimate_tokens_rough(user_block)))
|
||||
|
||||
# Skills
|
||||
skills_marker = "## Skills (mandatory)"
|
||||
if skills_marker in system_prompt:
|
||||
skills_start = system_prompt.index(skills_marker)
|
||||
# Find the next major section after skills
|
||||
_next_sections = ["\nConversation started:", "\nYou are running as"]
|
||||
skills_end = len(system_prompt)
|
||||
for _sect in _next_sections:
|
||||
idx = system_prompt.find(_sect, skills_start + 10)
|
||||
if idx != -1:
|
||||
skills_end = min(skills_end, idx)
|
||||
skills_text = system_prompt[skills_start:skills_end]
|
||||
components.append((" Skills index", estimate_tokens_rough(skills_text)))
|
||||
|
||||
# Context files (AGENTS.md, .cursorrules, etc.)
|
||||
ctx_marker = "# Project Context"
|
||||
if ctx_marker in system_prompt:
|
||||
ctx_start = system_prompt.index(ctx_marker)
|
||||
ctx_text = system_prompt[ctx_start:]
|
||||
# Trim to just the context files section
|
||||
for _end_mark in ["\nConversation started:", "\n## Skills"]:
|
||||
idx = ctx_text.find(_end_mark, 10)
|
||||
if idx != -1:
|
||||
ctx_text = ctx_text[:idx]
|
||||
break
|
||||
components.append((" Context files", estimate_tokens_rough(ctx_text)))
|
||||
|
||||
# Tool-use guidance, platform hints, timestamps — remainder
|
||||
accounted = sum(t for _, t in components)
|
||||
remainder = max(0, system_total - accounted)
|
||||
if remainder > 50:
|
||||
components.append((" Other (guidance, hints, timestamp)", remainder))
|
||||
|
||||
# ── Conversation breakdown ─────────────────────────────────
|
||||
msgs = self.conversation_history or []
|
||||
msg_counts = {"user": 0, "assistant": 0, "tool": 0, "system": 0}
|
||||
msg_tokens = {"user": 0, "assistant": 0, "tool": 0, "system": 0}
|
||||
tool_result_tokens = 0
|
||||
tool_call_tokens = 0
|
||||
compaction_summary_tokens = 0
|
||||
|
||||
from agent.context_compressor import SUMMARY_PREFIX, LEGACY_SUMMARY_PREFIX
|
||||
for msg in msgs:
|
||||
role = msg.get("role", "unknown")
|
||||
content = msg.get("content", "")
|
||||
content_str = str(content) if content else ""
|
||||
tokens = estimate_tokens_rough(content_str)
|
||||
|
||||
# Count tool_calls in assistant messages
|
||||
tool_calls = msg.get("tool_calls")
|
||||
if tool_calls:
|
||||
tc_str = str(tool_calls)
|
||||
tool_call_tokens += estimate_tokens_rough(tc_str)
|
||||
|
||||
if role in msg_counts:
|
||||
msg_counts[role] += 1
|
||||
msg_tokens[role] += tokens
|
||||
else:
|
||||
msg_counts.setdefault(role, 0)
|
||||
msg_tokens.setdefault(role, 0)
|
||||
msg_counts[role] += 1
|
||||
msg_tokens[role] += tokens
|
||||
|
||||
if role == "tool":
|
||||
tool_result_tokens += tokens
|
||||
|
||||
# Detect compaction summaries
|
||||
if content_str and (SUMMARY_PREFIX in content_str or LEGACY_SUMMARY_PREFIX in content_str):
|
||||
compaction_summary_tokens += tokens
|
||||
|
||||
conversation_total = estimate_messages_tokens_rough(msgs)
|
||||
|
||||
# ── Tool schemas ───────────────────────────────────────────
|
||||
tool_schemas_tokens = 0
|
||||
try:
|
||||
tool_schemas = getattr(agent, "_cached_tool_schemas", None)
|
||||
if tool_schemas:
|
||||
tool_schemas_tokens = estimate_tokens_rough(str(tool_schemas))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ── Grand total ────────────────────────────────────────────
|
||||
grand_total = system_total + conversation_total + tool_schemas_tokens
|
||||
percent = round((grand_total / context_length) * 100) if context_length else 0
|
||||
|
||||
# ── Render ─────────────────────────────────────────────────
|
||||
def _bar(tokens, total, width=20):
|
||||
if total <= 0:
|
||||
return ""
|
||||
filled = max(0, min(width, round((tokens / total) * width)))
|
||||
return "█" * filled + "░" * (width - filled)
|
||||
|
||||
def _fmt(tokens):
|
||||
if tokens >= 1000:
|
||||
return f"{tokens / 1000:.1f}K"
|
||||
return str(tokens)
|
||||
|
||||
print()
|
||||
model_short = (agent.model or "unknown").split("/")[-1]
|
||||
print(f"◎ Context Window — {model_short}")
|
||||
print(f" {_bar(grand_total, context_length, 30)} {_fmt(grand_total)} / {_fmt(context_length)} tokens ({percent}%)")
|
||||
print()
|
||||
|
||||
# System prompt
|
||||
print(f" ◆ System Prompt {_fmt(system_total):>8}")
|
||||
for label, toks in components:
|
||||
print(f" {label:<28} {_fmt(toks):>8}")
|
||||
|
||||
# Tool schemas
|
||||
if tool_schemas_tokens:
|
||||
n_tools = len(tool_schemas) if tool_schemas else 0
|
||||
print(f" ◆ Tool Schemas ({n_tools} tools) {_fmt(tool_schemas_tokens):>8}")
|
||||
|
||||
# Conversation
|
||||
total_msgs = sum(msg_counts.values())
|
||||
print(f" ◆ Conversation ({total_msgs} msgs) {_fmt(conversation_total):>8}")
|
||||
if msg_counts.get("user", 0):
|
||||
print(f" User messages ({msg_counts['user']}) {_fmt(msg_tokens['user']):>8}")
|
||||
if msg_counts.get("assistant", 0):
|
||||
print(f" Assistant messages ({msg_counts['assistant']}) {_fmt(msg_tokens['assistant']):>8}")
|
||||
if msg_counts.get("tool", 0):
|
||||
print(f" Tool results ({msg_counts['tool']}) {_fmt(tool_result_tokens):>8}")
|
||||
if tool_call_tokens:
|
||||
print(f" Tool calls {_fmt(tool_call_tokens):>8}")
|
||||
if compaction_summary_tokens:
|
||||
print(f" Compaction summaries {_fmt(compaction_summary_tokens):>8}")
|
||||
|
||||
# Compression info
|
||||
compressions = getattr(compressor, "compression_count", 0) or 0
|
||||
if compressions:
|
||||
print(f"\n ⚙ Compressions this session: {compressions}")
|
||||
|
||||
# Threshold info
|
||||
if compressor:
|
||||
threshold = getattr(compressor, "threshold_tokens", 0) or 0
|
||||
if threshold:
|
||||
remaining = max(0, threshold - grand_total)
|
||||
print(f" ⚙ Auto-compress at: ~{_fmt(threshold)} tokens ({_fmt(remaining)} remaining)")
|
||||
print()
|
||||
|
||||
def _show_usage(self):
|
||||
"""Show rate limits (if available) and session token usage."""
|
||||
if not self.agent:
|
||||
|
||||
@@ -69,7 +69,10 @@ COMMAND_REGISTRY: list[CommandDef] = [
|
||||
args_hint="[name]"),
|
||||
CommandDef("branch", "Branch the current session (explore a different path)", "Session",
|
||||
aliases=("fork",), args_hint="[name]"),
|
||||
CommandDef("compress", "Manually compress conversation context", "Session"),
|
||||
CommandDef("compress", "Manually compress conversation context", "Session",
|
||||
args_hint="[focus topic]"),
|
||||
CommandDef("context", "Show live context window breakdown (token usage per component)",
|
||||
"Info", aliases=("ctx",)),
|
||||
CommandDef("rollback", "List or restore filesystem checkpoints", "Session",
|
||||
args_hint="[number]"),
|
||||
CommandDef("stop", "Kill all running background processes", "Session"),
|
||||
|
||||
12
run_agent.py
12
run_agent.py
@@ -6281,17 +6281,23 @@ class AIAgent:
|
||||
if messages and messages[-1].get("_flush_sentinel") == _sentinel:
|
||||
messages.pop()
|
||||
|
||||
def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None, task_id: str = "default") -> tuple:
|
||||
def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None, task_id: str = "default", focus_topic: str = None) -> tuple:
|
||||
"""Compress conversation context and split the session in SQLite.
|
||||
|
||||
Args:
|
||||
focus_topic: Optional focus string for guided compression — the
|
||||
summariser will prioritise preserving information related to
|
||||
this topic. Inspired by Claude Code's ``/compact <focus>``.
|
||||
|
||||
Returns:
|
||||
(compressed_messages, new_system_prompt) tuple
|
||||
"""
|
||||
_pre_msg_count = len(messages)
|
||||
logger.info(
|
||||
"context compression started: session=%s messages=%d tokens=~%s model=%s",
|
||||
"context compression started: session=%s messages=%d tokens=~%s model=%s focus=%r",
|
||||
self.session_id or "none", _pre_msg_count,
|
||||
f"{approx_tokens:,}" if approx_tokens else "unknown", self.model,
|
||||
focus_topic,
|
||||
)
|
||||
# Pre-compression memory flush: let the model save memories before they're lost
|
||||
self.flush_memories(messages, min_turns=0)
|
||||
@@ -6303,7 +6309,7 @@ class AIAgent:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
compressed = self.context_compressor.compress(messages, current_tokens=approx_tokens)
|
||||
compressed = self.context_compressor.compress(messages, current_tokens=approx_tokens, focus_topic=focus_topic)
|
||||
|
||||
todo_snapshot = self._todo_store.format_for_injection()
|
||||
if todo_snapshot:
|
||||
|
||||
345
tests/cli/test_context_breakdown.py
Normal file
345
tests/cli/test_context_breakdown.py
Normal file
@@ -0,0 +1,345 @@
|
||||
"""Tests for /context command — live context window breakdown.
|
||||
|
||||
Inspired by Claude Code's /context feature.
|
||||
"""
|
||||
|
||||
import os
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_cli(tmp_path):
|
||||
"""Build a minimal HermesCLI stub with enough state for _show_context_breakdown."""
|
||||
from cli import HermesCLI
|
||||
|
||||
cli_obj = object.__new__(HermesCLI)
|
||||
# Minimal attrs expected by _show_context_breakdown
|
||||
cli_obj.agent = None
|
||||
cli_obj.conversation_history = []
|
||||
return cli_obj
|
||||
|
||||
|
||||
def _make_agent_stub(model="anthropic/claude-sonnet-4.6", system_prompt="You are Hermes.",
|
||||
context_length=200000, compression_count=0, threshold_tokens=160000,
|
||||
last_prompt_tokens=50000):
|
||||
"""Return a mock agent with attributes used by _show_context_breakdown."""
|
||||
agent = MagicMock()
|
||||
agent.model = model
|
||||
agent._cached_system_prompt = system_prompt
|
||||
agent.session_input_tokens = 1000
|
||||
agent.session_output_tokens = 500
|
||||
|
||||
compressor = MagicMock()
|
||||
compressor.context_length = context_length
|
||||
compressor.compression_count = compression_count
|
||||
compressor.threshold_tokens = threshold_tokens
|
||||
compressor.last_prompt_tokens = last_prompt_tokens
|
||||
agent.context_compressor = compressor
|
||||
|
||||
agent._memory_store = None
|
||||
agent._cached_tool_schemas = None
|
||||
return agent
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestContextBreakdown:
|
||||
"""Tests for _show_context_breakdown method."""
|
||||
|
||||
def test_no_agent(self, tmp_path, capsys):
|
||||
"""When no agent is active, prints a helpful message."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
assert "No active agent" in out
|
||||
|
||||
def test_basic_breakdown(self, tmp_path, capsys):
|
||||
"""Basic breakdown shows model, context bar, and section headers."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj.agent = _make_agent_stub()
|
||||
cli_obj.conversation_history = [
|
||||
{"role": "user", "content": "Hello"},
|
||||
{"role": "assistant", "content": "Hi there!"},
|
||||
]
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
|
||||
# Model name should appear
|
||||
assert "claude-sonnet-4.6" in out
|
||||
# Section headers
|
||||
assert "System Prompt" in out
|
||||
assert "Conversation" in out
|
||||
# Token counts appear
|
||||
assert "tokens" in out
|
||||
|
||||
def test_shows_context_percentage(self, tmp_path, capsys):
|
||||
"""The context usage percentage is displayed."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj.agent = _make_agent_stub()
|
||||
cli_obj.conversation_history = []
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
assert "%" in out
|
||||
|
||||
def test_shows_tool_schemas_when_present(self, tmp_path, capsys):
|
||||
"""When tool schemas are cached, their token count is shown."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
agent = _make_agent_stub()
|
||||
agent._cached_tool_schemas = [
|
||||
{"name": "tool1", "description": "Does something", "parameters": {}},
|
||||
{"name": "tool2", "description": "Does another thing", "parameters": {}},
|
||||
]
|
||||
cli_obj.agent = agent
|
||||
cli_obj.conversation_history = []
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
assert "Tool Schemas" in out
|
||||
assert "2 tools" in out
|
||||
|
||||
def test_shows_message_role_breakdown(self, tmp_path, capsys):
|
||||
"""Individual message role counts are shown."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj.agent = _make_agent_stub()
|
||||
cli_obj.conversation_history = [
|
||||
{"role": "user", "content": "Do something"},
|
||||
{"role": "assistant", "content": "OK", "tool_calls": [
|
||||
{"id": "call_1", "function": {"name": "terminal", "arguments": '{"command":"ls"}'}}
|
||||
]},
|
||||
{"role": "tool", "content": '{"output": "file1.py\\nfile2.py"}', "tool_call_id": "call_1"},
|
||||
{"role": "assistant", "content": "Found 2 files."},
|
||||
{"role": "user", "content": "Good"},
|
||||
]
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
assert "User messages (2)" in out
|
||||
assert "Assistant messages (2)" in out
|
||||
assert "Tool results (1)" in out
|
||||
|
||||
def test_shows_compression_info(self, tmp_path, capsys):
|
||||
"""When compressions have occurred, that info is shown."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj.agent = _make_agent_stub(compression_count=2)
|
||||
cli_obj.conversation_history = []
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
assert "Compressions this session: 2" in out
|
||||
|
||||
def test_shows_auto_compress_threshold(self, tmp_path, capsys):
|
||||
"""Auto-compress threshold and remaining tokens are shown."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj.agent = _make_agent_stub(threshold_tokens=160000)
|
||||
cli_obj.conversation_history = []
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
assert "Auto-compress at" in out
|
||||
assert "remaining" in out
|
||||
|
||||
def test_detects_compaction_summaries(self, tmp_path, capsys):
|
||||
"""Messages containing compaction summary markers are identified."""
|
||||
from agent.context_compressor import SUMMARY_PREFIX
|
||||
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj.agent = _make_agent_stub()
|
||||
cli_obj.conversation_history = [
|
||||
{"role": "assistant", "content": f"{SUMMARY_PREFIX}\n## Goal\nBuild a feature."},
|
||||
{"role": "user", "content": "Continue from the summary."},
|
||||
]
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
assert "Compaction summaries" in out
|
||||
|
||||
def test_bar_rendering(self, tmp_path, capsys):
|
||||
"""The progress bar renders block characters."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj.agent = _make_agent_stub()
|
||||
cli_obj.conversation_history = [
|
||||
{"role": "user", "content": "x" * 1000},
|
||||
]
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
# Should contain block characters from the bar
|
||||
assert "█" in out or "░" in out
|
||||
|
||||
def test_identifies_skills_section(self, tmp_path, capsys):
|
||||
"""When system prompt contains skills marker, it's broken out."""
|
||||
system_prompt = (
|
||||
"You are Hermes.\n\n"
|
||||
"## Skills (mandatory)\n"
|
||||
"Before replying, scan the skills below.\n"
|
||||
"<available_skills>\n skill1: does something\n</available_skills>\n\n"
|
||||
"Conversation started: Friday, April 10, 2026"
|
||||
)
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj.agent = _make_agent_stub(system_prompt=system_prompt)
|
||||
cli_obj.conversation_history = []
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
assert "Skills index" in out
|
||||
|
||||
def test_identifies_context_files_section(self, tmp_path, capsys):
|
||||
"""When system prompt contains context files marker, it's broken out."""
|
||||
system_prompt = (
|
||||
"You are Hermes.\n\n"
|
||||
"# Project Context\n\n"
|
||||
"## AGENTS.md\nDevelopment guide content here...\n\n"
|
||||
"Conversation started: Friday, April 10, 2026"
|
||||
)
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
cli_obj.agent = _make_agent_stub(system_prompt=system_prompt)
|
||||
cli_obj.conversation_history = []
|
||||
|
||||
cli_obj._show_context_breakdown()
|
||||
out = capsys.readouterr().out
|
||||
assert "Context files" in out
|
||||
|
||||
|
||||
class TestCompressFocusTopic:
|
||||
"""Tests for /compress <focus> — guided compression."""
|
||||
|
||||
def test_focus_topic_extracted(self, tmp_path, capsys):
|
||||
"""Focus topic is extracted from the command string."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
agent = _make_agent_stub()
|
||||
agent.compression_enabled = True
|
||||
agent._cached_system_prompt = "You are Hermes."
|
||||
# Make compress return the messages unchanged for testing
|
||||
agent._compress_context = MagicMock(return_value=(
|
||||
[{"role": "user", "content": "test"}],
|
||||
"system prompt",
|
||||
))
|
||||
cli_obj.agent = agent
|
||||
cli_obj.conversation_history = [
|
||||
{"role": "user", "content": "a"},
|
||||
{"role": "assistant", "content": "b"},
|
||||
{"role": "user", "content": "c"},
|
||||
{"role": "assistant", "content": "d"},
|
||||
]
|
||||
|
||||
cli_obj._manual_compress("/compress database schema")
|
||||
out = capsys.readouterr().out
|
||||
assert 'focus: "database schema"' in out
|
||||
|
||||
# Verify the focus_topic was passed through
|
||||
agent._compress_context.assert_called_once()
|
||||
call_kwargs = agent._compress_context.call_args
|
||||
assert call_kwargs.kwargs.get("focus_topic") == "database schema"
|
||||
|
||||
def test_no_focus_topic_when_bare_command(self, tmp_path, capsys):
|
||||
"""When no focus topic is provided, None is passed."""
|
||||
cli_obj = _make_cli(tmp_path)
|
||||
agent = _make_agent_stub()
|
||||
agent.compression_enabled = True
|
||||
agent._cached_system_prompt = "You are Hermes."
|
||||
agent._compress_context = MagicMock(return_value=(
|
||||
[{"role": "user", "content": "test"}],
|
||||
"system prompt",
|
||||
))
|
||||
cli_obj.agent = agent
|
||||
cli_obj.conversation_history = [
|
||||
{"role": "user", "content": "a"},
|
||||
{"role": "assistant", "content": "b"},
|
||||
{"role": "user", "content": "c"},
|
||||
{"role": "assistant", "content": "d"},
|
||||
]
|
||||
|
||||
cli_obj._manual_compress("/compress")
|
||||
agent._compress_context.assert_called_once()
|
||||
call_kwargs = agent._compress_context.call_args
|
||||
assert call_kwargs.kwargs.get("focus_topic") is None
|
||||
|
||||
def test_focus_topic_in_generate_summary_prompt(self):
|
||||
"""Focus topic is injected into the LLM prompt for summarization."""
|
||||
from agent.context_compressor import ContextCompressor
|
||||
|
||||
compressor = ContextCompressor.__new__(ContextCompressor)
|
||||
compressor.protect_first_n = 2
|
||||
compressor.protect_last_n = 5
|
||||
compressor.tail_token_budget = 20000
|
||||
compressor.context_length = 200000
|
||||
compressor.threshold_percent = 0.80
|
||||
compressor.threshold_tokens = 160000
|
||||
compressor.max_summary_tokens = 10000
|
||||
compressor.quiet_mode = True
|
||||
compressor.compression_count = 0
|
||||
compressor.last_prompt_tokens = 0
|
||||
compressor._previous_summary = None
|
||||
compressor._summary_failure_cooldown_until = 0.0
|
||||
compressor.summary_model = None
|
||||
|
||||
turns = [
|
||||
{"role": "user", "content": "Tell me about the database schema"},
|
||||
{"role": "assistant", "content": "The schema has tables: users, orders, products."},
|
||||
]
|
||||
|
||||
# Mock call_llm to capture the prompt
|
||||
captured_prompt = {}
|
||||
|
||||
def mock_call_llm(**kwargs):
|
||||
captured_prompt["messages"] = kwargs["messages"]
|
||||
resp = MagicMock()
|
||||
resp.choices = [MagicMock()]
|
||||
resp.choices[0].message.content = "## Goal\nUnderstand DB schema."
|
||||
return resp
|
||||
|
||||
with patch("agent.context_compressor.call_llm", mock_call_llm):
|
||||
result = compressor._generate_summary(turns, focus_topic="database schema")
|
||||
|
||||
assert result is not None
|
||||
prompt_text = captured_prompt["messages"][0]["content"]
|
||||
assert 'FOCUS TOPIC: "database schema"' in prompt_text
|
||||
assert "PRIORITISE" in prompt_text
|
||||
|
||||
def test_no_focus_topic_no_injection(self):
|
||||
"""Without focus_topic, the prompt doesn't contain focus guidance."""
|
||||
from agent.context_compressor import ContextCompressor
|
||||
|
||||
compressor = ContextCompressor.__new__(ContextCompressor)
|
||||
compressor.protect_first_n = 2
|
||||
compressor.protect_last_n = 5
|
||||
compressor.tail_token_budget = 20000
|
||||
compressor.context_length = 200000
|
||||
compressor.threshold_percent = 0.80
|
||||
compressor.threshold_tokens = 160000
|
||||
compressor.max_summary_tokens = 10000
|
||||
compressor.quiet_mode = True
|
||||
compressor.compression_count = 0
|
||||
compressor.last_prompt_tokens = 0
|
||||
compressor._previous_summary = None
|
||||
compressor._summary_failure_cooldown_until = 0.0
|
||||
compressor.summary_model = None
|
||||
|
||||
turns = [
|
||||
{"role": "user", "content": "Hello"},
|
||||
{"role": "assistant", "content": "Hi"},
|
||||
]
|
||||
|
||||
captured_prompt = {}
|
||||
|
||||
def mock_call_llm(**kwargs):
|
||||
captured_prompt["messages"] = kwargs["messages"]
|
||||
resp = MagicMock()
|
||||
resp.choices = [MagicMock()]
|
||||
resp.choices[0].message.content = "## Goal\nGreeting."
|
||||
return resp
|
||||
|
||||
with patch("agent.context_compressor.call_llm", mock_call_llm):
|
||||
result = compressor._generate_summary(turns)
|
||||
|
||||
prompt_text = captured_prompt["messages"][0]["content"]
|
||||
assert "FOCUS TOPIC" not in prompt_text
|
||||
Reference in New Issue
Block a user