mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 23:41:35 +08:00
Compare commits
1 Commits
fix/modal-
...
claude-cod
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
08b97660c5 |
@@ -267,13 +267,19 @@ class ContextCompressor:
|
|||||||
|
|
||||||
return "\n\n".join(parts)
|
return "\n\n".join(parts)
|
||||||
|
|
||||||
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> Optional[str]:
|
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]], focus_topic: str = None) -> Optional[str]:
|
||||||
"""Generate a structured summary of conversation turns.
|
"""Generate a structured summary of conversation turns.
|
||||||
|
|
||||||
Uses a structured template (Goal, Progress, Decisions, Files, Next Steps)
|
Uses a structured template (Goal, Progress, Decisions, Files, Next Steps)
|
||||||
inspired by Pi-mono and OpenCode. When a previous summary exists,
|
inspired by Pi-mono and OpenCode. When a previous summary exists,
|
||||||
generates an iterative update instead of summarizing from scratch.
|
generates an iterative update instead of summarizing from scratch.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
focus_topic: Optional focus string for guided compression. When
|
||||||
|
provided, the summariser prioritises preserving information
|
||||||
|
related to this topic and is more aggressive about compressing
|
||||||
|
everything else. Inspired by Claude Code's ``/compact``.
|
||||||
|
|
||||||
Returns None if all attempts fail — the caller should drop
|
Returns None if all attempts fail — the caller should drop
|
||||||
the middle turns without a summary rather than inject a useless
|
the middle turns without a summary rather than inject a useless
|
||||||
placeholder.
|
placeholder.
|
||||||
@@ -375,6 +381,14 @@ Target ~{summary_budget} tokens. Be specific — include file paths, command out
|
|||||||
|
|
||||||
Write only the summary body. Do not include any preamble or prefix."""
|
Write only the summary body. Do not include any preamble or prefix."""
|
||||||
|
|
||||||
|
# Inject focus topic guidance when the user provides one via /compress <focus>.
|
||||||
|
# This goes at the end of the prompt so it takes precedence.
|
||||||
|
if focus_topic:
|
||||||
|
prompt += f"""
|
||||||
|
|
||||||
|
FOCUS TOPIC: "{focus_topic}"
|
||||||
|
The user has requested that this compaction PRIORITISE preserving all information related to the focus topic above. For content related to "{focus_topic}", include full detail — exact values, file paths, command outputs, error messages, and decisions. For content NOT related to the focus topic, summarise more aggressively (brief one-liners or omit if truly irrelevant). The focus topic sections should receive roughly 60-70% of the summary token budget."""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
call_kwargs = {
|
call_kwargs = {
|
||||||
"task": "compression",
|
"task": "compression",
|
||||||
@@ -592,7 +606,7 @@ Write only the summary body. Do not include any preamble or prefix."""
|
|||||||
# Main compression entry point
|
# Main compression entry point
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
|
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None, focus_topic: str = None) -> List[Dict[str, Any]]:
|
||||||
"""Compress conversation messages by summarizing middle turns.
|
"""Compress conversation messages by summarizing middle turns.
|
||||||
|
|
||||||
Algorithm:
|
Algorithm:
|
||||||
@@ -604,6 +618,12 @@ Write only the summary body. Do not include any preamble or prefix."""
|
|||||||
|
|
||||||
After compression, orphaned tool_call / tool_result pairs are cleaned
|
After compression, orphaned tool_call / tool_result pairs are cleaned
|
||||||
up so the API never receives mismatched IDs.
|
up so the API never receives mismatched IDs.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
focus_topic: Optional focus string for guided compression. When
|
||||||
|
provided, the summariser will prioritise preserving information
|
||||||
|
related to this topic and be more aggressive about compressing
|
||||||
|
everything else. Inspired by Claude Code's ``/compact``.
|
||||||
"""
|
"""
|
||||||
n_messages = len(messages)
|
n_messages = len(messages)
|
||||||
# Only need head + 3 tail messages minimum (token budget decides the real tail size)
|
# Only need head + 3 tail messages minimum (token budget decides the real tail size)
|
||||||
@@ -661,7 +681,7 @@ Write only the summary body. Do not include any preamble or prefix."""
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Phase 3: Generate structured summary
|
# Phase 3: Generate structured summary
|
||||||
summary = self._generate_summary(turns_to_summarize)
|
summary = self._generate_summary(turns_to_summarize, focus_topic=focus_topic)
|
||||||
|
|
||||||
# Phase 4: Assemble compressed message list
|
# Phase 4: Assemble compressed message list
|
||||||
compressed = []
|
compressed = []
|
||||||
|
|||||||
220
cli.py
220
cli.py
@@ -4962,7 +4962,9 @@ class HermesCLI:
|
|||||||
elif canonical == "fast":
|
elif canonical == "fast":
|
||||||
self._handle_fast_command(cmd_original)
|
self._handle_fast_command(cmd_original)
|
||||||
elif canonical == "compress":
|
elif canonical == "compress":
|
||||||
self._manual_compress()
|
self._manual_compress(cmd_original)
|
||||||
|
elif canonical == "context":
|
||||||
|
self._show_context_breakdown()
|
||||||
elif canonical == "usage":
|
elif canonical == "usage":
|
||||||
self._show_usage()
|
self._show_usage()
|
||||||
elif canonical == "insights":
|
elif canonical == "insights":
|
||||||
@@ -5818,8 +5820,14 @@ class HermesCLI:
|
|||||||
self._reasoning_preview_buf = getattr(self, "_reasoning_preview_buf", "") + reasoning_text
|
self._reasoning_preview_buf = getattr(self, "_reasoning_preview_buf", "") + reasoning_text
|
||||||
self._flush_reasoning_preview(force=False)
|
self._flush_reasoning_preview(force=False)
|
||||||
|
|
||||||
def _manual_compress(self):
|
def _manual_compress(self, cmd_original: str = ""):
|
||||||
"""Manually trigger context compression on the current conversation."""
|
"""Manually trigger context compression on the current conversation.
|
||||||
|
|
||||||
|
Accepts an optional focus topic: ``/compress <focus>`` guides the
|
||||||
|
summariser to preserve information related to *focus* while being
|
||||||
|
more aggressive about discarding everything else. Inspired by
|
||||||
|
Claude Code's ``/compact <focus>`` feature.
|
||||||
|
"""
|
||||||
if not self.conversation_history or len(self.conversation_history) < 4:
|
if not self.conversation_history or len(self.conversation_history) < 4:
|
||||||
print("(._.) Not enough conversation to compress (need at least 4 messages).")
|
print("(._.) Not enough conversation to compress (need at least 4 messages).")
|
||||||
return
|
return
|
||||||
@@ -5832,16 +5840,28 @@ class HermesCLI:
|
|||||||
print("(._.) Compression is disabled in config.")
|
print("(._.) Compression is disabled in config.")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Extract optional focus topic from the command (e.g. "/compress database schema")
|
||||||
|
focus_topic = ""
|
||||||
|
if cmd_original:
|
||||||
|
parts = cmd_original.strip().split(None, 1)
|
||||||
|
if len(parts) > 1:
|
||||||
|
focus_topic = parts[1].strip()
|
||||||
|
|
||||||
original_count = len(self.conversation_history)
|
original_count = len(self.conversation_history)
|
||||||
try:
|
try:
|
||||||
from agent.model_metadata import estimate_messages_tokens_rough
|
from agent.model_metadata import estimate_messages_tokens_rough
|
||||||
approx_tokens = estimate_messages_tokens_rough(self.conversation_history)
|
approx_tokens = estimate_messages_tokens_rough(self.conversation_history)
|
||||||
print(f"🗜️ Compressing {original_count} messages (~{approx_tokens:,} tokens)...")
|
if focus_topic:
|
||||||
|
print(f"🗜️ Compressing {original_count} messages (~{approx_tokens:,} tokens), "
|
||||||
|
f"focus: \"{focus_topic}\"...")
|
||||||
|
else:
|
||||||
|
print(f"🗜️ Compressing {original_count} messages (~{approx_tokens:,} tokens)...")
|
||||||
|
|
||||||
compressed, _new_system = self.agent._compress_context(
|
compressed, _new_system = self.agent._compress_context(
|
||||||
self.conversation_history,
|
self.conversation_history,
|
||||||
self.agent._cached_system_prompt or "",
|
self.agent._cached_system_prompt or "",
|
||||||
approx_tokens=approx_tokens,
|
approx_tokens=approx_tokens,
|
||||||
|
focus_topic=focus_topic or None,
|
||||||
)
|
)
|
||||||
self.conversation_history = compressed
|
self.conversation_history = compressed
|
||||||
new_count = len(self.conversation_history)
|
new_count = len(self.conversation_history)
|
||||||
@@ -5854,6 +5874,198 @@ class HermesCLI:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f" ❌ Compression failed: {e}")
|
print(f" ❌ Compression failed: {e}")
|
||||||
|
|
||||||
|
def _show_context_breakdown(self):
|
||||||
|
"""Show a live breakdown of context window usage by component.
|
||||||
|
|
||||||
|
Inspired by Claude Code's ``/context`` command — gives users visibility
|
||||||
|
into what is consuming their context window (system prompt, memory,
|
||||||
|
skills, context files, conversation messages, tool results, etc.).
|
||||||
|
"""
|
||||||
|
if not self.agent:
|
||||||
|
print("(._.) No active agent — send a message first.")
|
||||||
|
return
|
||||||
|
|
||||||
|
from agent.model_metadata import (
|
||||||
|
estimate_tokens_rough,
|
||||||
|
estimate_messages_tokens_rough,
|
||||||
|
)
|
||||||
|
|
||||||
|
agent = self.agent
|
||||||
|
compressor = getattr(agent, "context_compressor", None)
|
||||||
|
context_length = getattr(compressor, "context_length", 0) or 0
|
||||||
|
if not context_length:
|
||||||
|
from agent.model_metadata import get_model_context_length
|
||||||
|
context_length = get_model_context_length(agent.model or "")
|
||||||
|
|
||||||
|
# ── System prompt breakdown ────────────────────────────────
|
||||||
|
system_prompt = getattr(agent, "_cached_system_prompt", "") or ""
|
||||||
|
system_total = estimate_tokens_rough(system_prompt)
|
||||||
|
|
||||||
|
# Attempt to break down the system prompt into its component layers.
|
||||||
|
# The prompt is assembled by joining parts with "\n\n", so we can
|
||||||
|
# identify known sections by their content signatures.
|
||||||
|
components = []
|
||||||
|
if system_prompt:
|
||||||
|
from agent.prompt_builder import load_soul_md, DEFAULT_AGENT_IDENTITY
|
||||||
|
# Identity block
|
||||||
|
soul = load_soul_md()
|
||||||
|
if soul and soul[:60] in system_prompt:
|
||||||
|
identity_tokens = estimate_tokens_rough(soul)
|
||||||
|
components.append((" Identity (SOUL.md)", identity_tokens))
|
||||||
|
elif DEFAULT_AGENT_IDENTITY[:40] in system_prompt:
|
||||||
|
identity_tokens = estimate_tokens_rough(DEFAULT_AGENT_IDENTITY)
|
||||||
|
components.append((" Identity (built-in)", identity_tokens))
|
||||||
|
|
||||||
|
# Memory
|
||||||
|
mem_store = getattr(agent, "_memory_store", None)
|
||||||
|
if mem_store:
|
||||||
|
mem_block = mem_store.format_for_system_prompt("memory")
|
||||||
|
if mem_block and mem_block[:30] in system_prompt:
|
||||||
|
components.append((" Memory", estimate_tokens_rough(mem_block)))
|
||||||
|
user_block = mem_store.format_for_system_prompt("user")
|
||||||
|
if user_block and user_block[:30] in system_prompt:
|
||||||
|
components.append((" User profile", estimate_tokens_rough(user_block)))
|
||||||
|
|
||||||
|
# Skills
|
||||||
|
skills_marker = "## Skills (mandatory)"
|
||||||
|
if skills_marker in system_prompt:
|
||||||
|
skills_start = system_prompt.index(skills_marker)
|
||||||
|
# Find the next major section after skills
|
||||||
|
_next_sections = ["\nConversation started:", "\nYou are running as"]
|
||||||
|
skills_end = len(system_prompt)
|
||||||
|
for _sect in _next_sections:
|
||||||
|
idx = system_prompt.find(_sect, skills_start + 10)
|
||||||
|
if idx != -1:
|
||||||
|
skills_end = min(skills_end, idx)
|
||||||
|
skills_text = system_prompt[skills_start:skills_end]
|
||||||
|
components.append((" Skills index", estimate_tokens_rough(skills_text)))
|
||||||
|
|
||||||
|
# Context files (AGENTS.md, .cursorrules, etc.)
|
||||||
|
ctx_marker = "# Project Context"
|
||||||
|
if ctx_marker in system_prompt:
|
||||||
|
ctx_start = system_prompt.index(ctx_marker)
|
||||||
|
ctx_text = system_prompt[ctx_start:]
|
||||||
|
# Trim to just the context files section
|
||||||
|
for _end_mark in ["\nConversation started:", "\n## Skills"]:
|
||||||
|
idx = ctx_text.find(_end_mark, 10)
|
||||||
|
if idx != -1:
|
||||||
|
ctx_text = ctx_text[:idx]
|
||||||
|
break
|
||||||
|
components.append((" Context files", estimate_tokens_rough(ctx_text)))
|
||||||
|
|
||||||
|
# Tool-use guidance, platform hints, timestamps — remainder
|
||||||
|
accounted = sum(t for _, t in components)
|
||||||
|
remainder = max(0, system_total - accounted)
|
||||||
|
if remainder > 50:
|
||||||
|
components.append((" Other (guidance, hints, timestamp)", remainder))
|
||||||
|
|
||||||
|
# ── Conversation breakdown ─────────────────────────────────
|
||||||
|
msgs = self.conversation_history or []
|
||||||
|
msg_counts = {"user": 0, "assistant": 0, "tool": 0, "system": 0}
|
||||||
|
msg_tokens = {"user": 0, "assistant": 0, "tool": 0, "system": 0}
|
||||||
|
tool_result_tokens = 0
|
||||||
|
tool_call_tokens = 0
|
||||||
|
compaction_summary_tokens = 0
|
||||||
|
|
||||||
|
from agent.context_compressor import SUMMARY_PREFIX, LEGACY_SUMMARY_PREFIX
|
||||||
|
for msg in msgs:
|
||||||
|
role = msg.get("role", "unknown")
|
||||||
|
content = msg.get("content", "")
|
||||||
|
content_str = str(content) if content else ""
|
||||||
|
tokens = estimate_tokens_rough(content_str)
|
||||||
|
|
||||||
|
# Count tool_calls in assistant messages
|
||||||
|
tool_calls = msg.get("tool_calls")
|
||||||
|
if tool_calls:
|
||||||
|
tc_str = str(tool_calls)
|
||||||
|
tool_call_tokens += estimate_tokens_rough(tc_str)
|
||||||
|
|
||||||
|
if role in msg_counts:
|
||||||
|
msg_counts[role] += 1
|
||||||
|
msg_tokens[role] += tokens
|
||||||
|
else:
|
||||||
|
msg_counts.setdefault(role, 0)
|
||||||
|
msg_tokens.setdefault(role, 0)
|
||||||
|
msg_counts[role] += 1
|
||||||
|
msg_tokens[role] += tokens
|
||||||
|
|
||||||
|
if role == "tool":
|
||||||
|
tool_result_tokens += tokens
|
||||||
|
|
||||||
|
# Detect compaction summaries
|
||||||
|
if content_str and (SUMMARY_PREFIX in content_str or LEGACY_SUMMARY_PREFIX in content_str):
|
||||||
|
compaction_summary_tokens += tokens
|
||||||
|
|
||||||
|
conversation_total = estimate_messages_tokens_rough(msgs)
|
||||||
|
|
||||||
|
# ── Tool schemas ───────────────────────────────────────────
|
||||||
|
tool_schemas_tokens = 0
|
||||||
|
try:
|
||||||
|
tool_schemas = getattr(agent, "_cached_tool_schemas", None)
|
||||||
|
if tool_schemas:
|
||||||
|
tool_schemas_tokens = estimate_tokens_rough(str(tool_schemas))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ── Grand total ────────────────────────────────────────────
|
||||||
|
grand_total = system_total + conversation_total + tool_schemas_tokens
|
||||||
|
percent = round((grand_total / context_length) * 100) if context_length else 0
|
||||||
|
|
||||||
|
# ── Render ─────────────────────────────────────────────────
|
||||||
|
def _bar(tokens, total, width=20):
|
||||||
|
if total <= 0:
|
||||||
|
return ""
|
||||||
|
filled = max(0, min(width, round((tokens / total) * width)))
|
||||||
|
return "█" * filled + "░" * (width - filled)
|
||||||
|
|
||||||
|
def _fmt(tokens):
|
||||||
|
if tokens >= 1000:
|
||||||
|
return f"{tokens / 1000:.1f}K"
|
||||||
|
return str(tokens)
|
||||||
|
|
||||||
|
print()
|
||||||
|
model_short = (agent.model or "unknown").split("/")[-1]
|
||||||
|
print(f"◎ Context Window — {model_short}")
|
||||||
|
print(f" {_bar(grand_total, context_length, 30)} {_fmt(grand_total)} / {_fmt(context_length)} tokens ({percent}%)")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# System prompt
|
||||||
|
print(f" ◆ System Prompt {_fmt(system_total):>8}")
|
||||||
|
for label, toks in components:
|
||||||
|
print(f" {label:<28} {_fmt(toks):>8}")
|
||||||
|
|
||||||
|
# Tool schemas
|
||||||
|
if tool_schemas_tokens:
|
||||||
|
n_tools = len(tool_schemas) if tool_schemas else 0
|
||||||
|
print(f" ◆ Tool Schemas ({n_tools} tools) {_fmt(tool_schemas_tokens):>8}")
|
||||||
|
|
||||||
|
# Conversation
|
||||||
|
total_msgs = sum(msg_counts.values())
|
||||||
|
print(f" ◆ Conversation ({total_msgs} msgs) {_fmt(conversation_total):>8}")
|
||||||
|
if msg_counts.get("user", 0):
|
||||||
|
print(f" User messages ({msg_counts['user']}) {_fmt(msg_tokens['user']):>8}")
|
||||||
|
if msg_counts.get("assistant", 0):
|
||||||
|
print(f" Assistant messages ({msg_counts['assistant']}) {_fmt(msg_tokens['assistant']):>8}")
|
||||||
|
if msg_counts.get("tool", 0):
|
||||||
|
print(f" Tool results ({msg_counts['tool']}) {_fmt(tool_result_tokens):>8}")
|
||||||
|
if tool_call_tokens:
|
||||||
|
print(f" Tool calls {_fmt(tool_call_tokens):>8}")
|
||||||
|
if compaction_summary_tokens:
|
||||||
|
print(f" Compaction summaries {_fmt(compaction_summary_tokens):>8}")
|
||||||
|
|
||||||
|
# Compression info
|
||||||
|
compressions = getattr(compressor, "compression_count", 0) or 0
|
||||||
|
if compressions:
|
||||||
|
print(f"\n ⚙ Compressions this session: {compressions}")
|
||||||
|
|
||||||
|
# Threshold info
|
||||||
|
if compressor:
|
||||||
|
threshold = getattr(compressor, "threshold_tokens", 0) or 0
|
||||||
|
if threshold:
|
||||||
|
remaining = max(0, threshold - grand_total)
|
||||||
|
print(f" ⚙ Auto-compress at: ~{_fmt(threshold)} tokens ({_fmt(remaining)} remaining)")
|
||||||
|
print()
|
||||||
|
|
||||||
def _show_usage(self):
|
def _show_usage(self):
|
||||||
"""Show rate limits (if available) and session token usage."""
|
"""Show rate limits (if available) and session token usage."""
|
||||||
if not self.agent:
|
if not self.agent:
|
||||||
|
|||||||
@@ -69,7 +69,10 @@ COMMAND_REGISTRY: list[CommandDef] = [
|
|||||||
args_hint="[name]"),
|
args_hint="[name]"),
|
||||||
CommandDef("branch", "Branch the current session (explore a different path)", "Session",
|
CommandDef("branch", "Branch the current session (explore a different path)", "Session",
|
||||||
aliases=("fork",), args_hint="[name]"),
|
aliases=("fork",), args_hint="[name]"),
|
||||||
CommandDef("compress", "Manually compress conversation context", "Session"),
|
CommandDef("compress", "Manually compress conversation context", "Session",
|
||||||
|
args_hint="[focus topic]"),
|
||||||
|
CommandDef("context", "Show live context window breakdown (token usage per component)",
|
||||||
|
"Info", aliases=("ctx",)),
|
||||||
CommandDef("rollback", "List or restore filesystem checkpoints", "Session",
|
CommandDef("rollback", "List or restore filesystem checkpoints", "Session",
|
||||||
args_hint="[number]"),
|
args_hint="[number]"),
|
||||||
CommandDef("stop", "Kill all running background processes", "Session"),
|
CommandDef("stop", "Kill all running background processes", "Session"),
|
||||||
|
|||||||
12
run_agent.py
12
run_agent.py
@@ -6281,17 +6281,23 @@ class AIAgent:
|
|||||||
if messages and messages[-1].get("_flush_sentinel") == _sentinel:
|
if messages and messages[-1].get("_flush_sentinel") == _sentinel:
|
||||||
messages.pop()
|
messages.pop()
|
||||||
|
|
||||||
def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None, task_id: str = "default") -> tuple:
|
def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None, task_id: str = "default", focus_topic: str = None) -> tuple:
|
||||||
"""Compress conversation context and split the session in SQLite.
|
"""Compress conversation context and split the session in SQLite.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
focus_topic: Optional focus string for guided compression — the
|
||||||
|
summariser will prioritise preserving information related to
|
||||||
|
this topic. Inspired by Claude Code's ``/compact <focus>``.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(compressed_messages, new_system_prompt) tuple
|
(compressed_messages, new_system_prompt) tuple
|
||||||
"""
|
"""
|
||||||
_pre_msg_count = len(messages)
|
_pre_msg_count = len(messages)
|
||||||
logger.info(
|
logger.info(
|
||||||
"context compression started: session=%s messages=%d tokens=~%s model=%s",
|
"context compression started: session=%s messages=%d tokens=~%s model=%s focus=%r",
|
||||||
self.session_id or "none", _pre_msg_count,
|
self.session_id or "none", _pre_msg_count,
|
||||||
f"{approx_tokens:,}" if approx_tokens else "unknown", self.model,
|
f"{approx_tokens:,}" if approx_tokens else "unknown", self.model,
|
||||||
|
focus_topic,
|
||||||
)
|
)
|
||||||
# Pre-compression memory flush: let the model save memories before they're lost
|
# Pre-compression memory flush: let the model save memories before they're lost
|
||||||
self.flush_memories(messages, min_turns=0)
|
self.flush_memories(messages, min_turns=0)
|
||||||
@@ -6303,7 +6309,7 @@ class AIAgent:
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
compressed = self.context_compressor.compress(messages, current_tokens=approx_tokens)
|
compressed = self.context_compressor.compress(messages, current_tokens=approx_tokens, focus_topic=focus_topic)
|
||||||
|
|
||||||
todo_snapshot = self._todo_store.format_for_injection()
|
todo_snapshot = self._todo_store.format_for_injection()
|
||||||
if todo_snapshot:
|
if todo_snapshot:
|
||||||
|
|||||||
345
tests/cli/test_context_breakdown.py
Normal file
345
tests/cli/test_context_breakdown.py
Normal file
@@ -0,0 +1,345 @@
|
|||||||
|
"""Tests for /context command — live context window breakdown.
|
||||||
|
|
||||||
|
Inspired by Claude Code's /context feature.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _make_cli(tmp_path):
|
||||||
|
"""Build a minimal HermesCLI stub with enough state for _show_context_breakdown."""
|
||||||
|
from cli import HermesCLI
|
||||||
|
|
||||||
|
cli_obj = object.__new__(HermesCLI)
|
||||||
|
# Minimal attrs expected by _show_context_breakdown
|
||||||
|
cli_obj.agent = None
|
||||||
|
cli_obj.conversation_history = []
|
||||||
|
return cli_obj
|
||||||
|
|
||||||
|
|
||||||
|
def _make_agent_stub(model="anthropic/claude-sonnet-4.6", system_prompt="You are Hermes.",
|
||||||
|
context_length=200000, compression_count=0, threshold_tokens=160000,
|
||||||
|
last_prompt_tokens=50000):
|
||||||
|
"""Return a mock agent with attributes used by _show_context_breakdown."""
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.model = model
|
||||||
|
agent._cached_system_prompt = system_prompt
|
||||||
|
agent.session_input_tokens = 1000
|
||||||
|
agent.session_output_tokens = 500
|
||||||
|
|
||||||
|
compressor = MagicMock()
|
||||||
|
compressor.context_length = context_length
|
||||||
|
compressor.compression_count = compression_count
|
||||||
|
compressor.threshold_tokens = threshold_tokens
|
||||||
|
compressor.last_prompt_tokens = last_prompt_tokens
|
||||||
|
agent.context_compressor = compressor
|
||||||
|
|
||||||
|
agent._memory_store = None
|
||||||
|
agent._cached_tool_schemas = None
|
||||||
|
return agent
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tests
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestContextBreakdown:
|
||||||
|
"""Tests for _show_context_breakdown method."""
|
||||||
|
|
||||||
|
def test_no_agent(self, tmp_path, capsys):
|
||||||
|
"""When no agent is active, prints a helpful message."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert "No active agent" in out
|
||||||
|
|
||||||
|
def test_basic_breakdown(self, tmp_path, capsys):
|
||||||
|
"""Basic breakdown shows model, context bar, and section headers."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj.agent = _make_agent_stub()
|
||||||
|
cli_obj.conversation_history = [
|
||||||
|
{"role": "user", "content": "Hello"},
|
||||||
|
{"role": "assistant", "content": "Hi there!"},
|
||||||
|
]
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
|
||||||
|
# Model name should appear
|
||||||
|
assert "claude-sonnet-4.6" in out
|
||||||
|
# Section headers
|
||||||
|
assert "System Prompt" in out
|
||||||
|
assert "Conversation" in out
|
||||||
|
# Token counts appear
|
||||||
|
assert "tokens" in out
|
||||||
|
|
||||||
|
def test_shows_context_percentage(self, tmp_path, capsys):
|
||||||
|
"""The context usage percentage is displayed."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj.agent = _make_agent_stub()
|
||||||
|
cli_obj.conversation_history = []
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert "%" in out
|
||||||
|
|
||||||
|
def test_shows_tool_schemas_when_present(self, tmp_path, capsys):
|
||||||
|
"""When tool schemas are cached, their token count is shown."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
agent = _make_agent_stub()
|
||||||
|
agent._cached_tool_schemas = [
|
||||||
|
{"name": "tool1", "description": "Does something", "parameters": {}},
|
||||||
|
{"name": "tool2", "description": "Does another thing", "parameters": {}},
|
||||||
|
]
|
||||||
|
cli_obj.agent = agent
|
||||||
|
cli_obj.conversation_history = []
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert "Tool Schemas" in out
|
||||||
|
assert "2 tools" in out
|
||||||
|
|
||||||
|
def test_shows_message_role_breakdown(self, tmp_path, capsys):
|
||||||
|
"""Individual message role counts are shown."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj.agent = _make_agent_stub()
|
||||||
|
cli_obj.conversation_history = [
|
||||||
|
{"role": "user", "content": "Do something"},
|
||||||
|
{"role": "assistant", "content": "OK", "tool_calls": [
|
||||||
|
{"id": "call_1", "function": {"name": "terminal", "arguments": '{"command":"ls"}'}}
|
||||||
|
]},
|
||||||
|
{"role": "tool", "content": '{"output": "file1.py\\nfile2.py"}', "tool_call_id": "call_1"},
|
||||||
|
{"role": "assistant", "content": "Found 2 files."},
|
||||||
|
{"role": "user", "content": "Good"},
|
||||||
|
]
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert "User messages (2)" in out
|
||||||
|
assert "Assistant messages (2)" in out
|
||||||
|
assert "Tool results (1)" in out
|
||||||
|
|
||||||
|
def test_shows_compression_info(self, tmp_path, capsys):
|
||||||
|
"""When compressions have occurred, that info is shown."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj.agent = _make_agent_stub(compression_count=2)
|
||||||
|
cli_obj.conversation_history = []
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert "Compressions this session: 2" in out
|
||||||
|
|
||||||
|
def test_shows_auto_compress_threshold(self, tmp_path, capsys):
|
||||||
|
"""Auto-compress threshold and remaining tokens are shown."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj.agent = _make_agent_stub(threshold_tokens=160000)
|
||||||
|
cli_obj.conversation_history = []
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert "Auto-compress at" in out
|
||||||
|
assert "remaining" in out
|
||||||
|
|
||||||
|
def test_detects_compaction_summaries(self, tmp_path, capsys):
|
||||||
|
"""Messages containing compaction summary markers are identified."""
|
||||||
|
from agent.context_compressor import SUMMARY_PREFIX
|
||||||
|
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj.agent = _make_agent_stub()
|
||||||
|
cli_obj.conversation_history = [
|
||||||
|
{"role": "assistant", "content": f"{SUMMARY_PREFIX}\n## Goal\nBuild a feature."},
|
||||||
|
{"role": "user", "content": "Continue from the summary."},
|
||||||
|
]
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert "Compaction summaries" in out
|
||||||
|
|
||||||
|
def test_bar_rendering(self, tmp_path, capsys):
|
||||||
|
"""The progress bar renders block characters."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj.agent = _make_agent_stub()
|
||||||
|
cli_obj.conversation_history = [
|
||||||
|
{"role": "user", "content": "x" * 1000},
|
||||||
|
]
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
# Should contain block characters from the bar
|
||||||
|
assert "█" in out or "░" in out
|
||||||
|
|
||||||
|
def test_identifies_skills_section(self, tmp_path, capsys):
|
||||||
|
"""When system prompt contains skills marker, it's broken out."""
|
||||||
|
system_prompt = (
|
||||||
|
"You are Hermes.\n\n"
|
||||||
|
"## Skills (mandatory)\n"
|
||||||
|
"Before replying, scan the skills below.\n"
|
||||||
|
"<available_skills>\n skill1: does something\n</available_skills>\n\n"
|
||||||
|
"Conversation started: Friday, April 10, 2026"
|
||||||
|
)
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj.agent = _make_agent_stub(system_prompt=system_prompt)
|
||||||
|
cli_obj.conversation_history = []
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert "Skills index" in out
|
||||||
|
|
||||||
|
def test_identifies_context_files_section(self, tmp_path, capsys):
|
||||||
|
"""When system prompt contains context files marker, it's broken out."""
|
||||||
|
system_prompt = (
|
||||||
|
"You are Hermes.\n\n"
|
||||||
|
"# Project Context\n\n"
|
||||||
|
"## AGENTS.md\nDevelopment guide content here...\n\n"
|
||||||
|
"Conversation started: Friday, April 10, 2026"
|
||||||
|
)
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
cli_obj.agent = _make_agent_stub(system_prompt=system_prompt)
|
||||||
|
cli_obj.conversation_history = []
|
||||||
|
|
||||||
|
cli_obj._show_context_breakdown()
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert "Context files" in out
|
||||||
|
|
||||||
|
|
||||||
|
class TestCompressFocusTopic:
|
||||||
|
"""Tests for /compress <focus> — guided compression."""
|
||||||
|
|
||||||
|
def test_focus_topic_extracted(self, tmp_path, capsys):
|
||||||
|
"""Focus topic is extracted from the command string."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
agent = _make_agent_stub()
|
||||||
|
agent.compression_enabled = True
|
||||||
|
agent._cached_system_prompt = "You are Hermes."
|
||||||
|
# Make compress return the messages unchanged for testing
|
||||||
|
agent._compress_context = MagicMock(return_value=(
|
||||||
|
[{"role": "user", "content": "test"}],
|
||||||
|
"system prompt",
|
||||||
|
))
|
||||||
|
cli_obj.agent = agent
|
||||||
|
cli_obj.conversation_history = [
|
||||||
|
{"role": "user", "content": "a"},
|
||||||
|
{"role": "assistant", "content": "b"},
|
||||||
|
{"role": "user", "content": "c"},
|
||||||
|
{"role": "assistant", "content": "d"},
|
||||||
|
]
|
||||||
|
|
||||||
|
cli_obj._manual_compress("/compress database schema")
|
||||||
|
out = capsys.readouterr().out
|
||||||
|
assert 'focus: "database schema"' in out
|
||||||
|
|
||||||
|
# Verify the focus_topic was passed through
|
||||||
|
agent._compress_context.assert_called_once()
|
||||||
|
call_kwargs = agent._compress_context.call_args
|
||||||
|
assert call_kwargs.kwargs.get("focus_topic") == "database schema"
|
||||||
|
|
||||||
|
def test_no_focus_topic_when_bare_command(self, tmp_path, capsys):
|
||||||
|
"""When no focus topic is provided, None is passed."""
|
||||||
|
cli_obj = _make_cli(tmp_path)
|
||||||
|
agent = _make_agent_stub()
|
||||||
|
agent.compression_enabled = True
|
||||||
|
agent._cached_system_prompt = "You are Hermes."
|
||||||
|
agent._compress_context = MagicMock(return_value=(
|
||||||
|
[{"role": "user", "content": "test"}],
|
||||||
|
"system prompt",
|
||||||
|
))
|
||||||
|
cli_obj.agent = agent
|
||||||
|
cli_obj.conversation_history = [
|
||||||
|
{"role": "user", "content": "a"},
|
||||||
|
{"role": "assistant", "content": "b"},
|
||||||
|
{"role": "user", "content": "c"},
|
||||||
|
{"role": "assistant", "content": "d"},
|
||||||
|
]
|
||||||
|
|
||||||
|
cli_obj._manual_compress("/compress")
|
||||||
|
agent._compress_context.assert_called_once()
|
||||||
|
call_kwargs = agent._compress_context.call_args
|
||||||
|
assert call_kwargs.kwargs.get("focus_topic") is None
|
||||||
|
|
||||||
|
def test_focus_topic_in_generate_summary_prompt(self):
|
||||||
|
"""Focus topic is injected into the LLM prompt for summarization."""
|
||||||
|
from agent.context_compressor import ContextCompressor
|
||||||
|
|
||||||
|
compressor = ContextCompressor.__new__(ContextCompressor)
|
||||||
|
compressor.protect_first_n = 2
|
||||||
|
compressor.protect_last_n = 5
|
||||||
|
compressor.tail_token_budget = 20000
|
||||||
|
compressor.context_length = 200000
|
||||||
|
compressor.threshold_percent = 0.80
|
||||||
|
compressor.threshold_tokens = 160000
|
||||||
|
compressor.max_summary_tokens = 10000
|
||||||
|
compressor.quiet_mode = True
|
||||||
|
compressor.compression_count = 0
|
||||||
|
compressor.last_prompt_tokens = 0
|
||||||
|
compressor._previous_summary = None
|
||||||
|
compressor._summary_failure_cooldown_until = 0.0
|
||||||
|
compressor.summary_model = None
|
||||||
|
|
||||||
|
turns = [
|
||||||
|
{"role": "user", "content": "Tell me about the database schema"},
|
||||||
|
{"role": "assistant", "content": "The schema has tables: users, orders, products."},
|
||||||
|
]
|
||||||
|
|
||||||
|
# Mock call_llm to capture the prompt
|
||||||
|
captured_prompt = {}
|
||||||
|
|
||||||
|
def mock_call_llm(**kwargs):
|
||||||
|
captured_prompt["messages"] = kwargs["messages"]
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.choices = [MagicMock()]
|
||||||
|
resp.choices[0].message.content = "## Goal\nUnderstand DB schema."
|
||||||
|
return resp
|
||||||
|
|
||||||
|
with patch("agent.context_compressor.call_llm", mock_call_llm):
|
||||||
|
result = compressor._generate_summary(turns, focus_topic="database schema")
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
prompt_text = captured_prompt["messages"][0]["content"]
|
||||||
|
assert 'FOCUS TOPIC: "database schema"' in prompt_text
|
||||||
|
assert "PRIORITISE" in prompt_text
|
||||||
|
|
||||||
|
def test_no_focus_topic_no_injection(self):
|
||||||
|
"""Without focus_topic, the prompt doesn't contain focus guidance."""
|
||||||
|
from agent.context_compressor import ContextCompressor
|
||||||
|
|
||||||
|
compressor = ContextCompressor.__new__(ContextCompressor)
|
||||||
|
compressor.protect_first_n = 2
|
||||||
|
compressor.protect_last_n = 5
|
||||||
|
compressor.tail_token_budget = 20000
|
||||||
|
compressor.context_length = 200000
|
||||||
|
compressor.threshold_percent = 0.80
|
||||||
|
compressor.threshold_tokens = 160000
|
||||||
|
compressor.max_summary_tokens = 10000
|
||||||
|
compressor.quiet_mode = True
|
||||||
|
compressor.compression_count = 0
|
||||||
|
compressor.last_prompt_tokens = 0
|
||||||
|
compressor._previous_summary = None
|
||||||
|
compressor._summary_failure_cooldown_until = 0.0
|
||||||
|
compressor.summary_model = None
|
||||||
|
|
||||||
|
turns = [
|
||||||
|
{"role": "user", "content": "Hello"},
|
||||||
|
{"role": "assistant", "content": "Hi"},
|
||||||
|
]
|
||||||
|
|
||||||
|
captured_prompt = {}
|
||||||
|
|
||||||
|
def mock_call_llm(**kwargs):
|
||||||
|
captured_prompt["messages"] = kwargs["messages"]
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.choices = [MagicMock()]
|
||||||
|
resp.choices[0].message.content = "## Goal\nGreeting."
|
||||||
|
return resp
|
||||||
|
|
||||||
|
with patch("agent.context_compressor.call_llm", mock_call_llm):
|
||||||
|
result = compressor._generate_summary(turns)
|
||||||
|
|
||||||
|
prompt_text = captured_prompt["messages"][0]["content"]
|
||||||
|
assert "FOCUS TOPIC" not in prompt_text
|
||||||
Reference in New Issue
Block a user