Compare commits

...

4 Commits

Author SHA1 Message Date
Teknium
d0d57bcde1 fix: robust context engine interface — config selection, plugin discovery, ABC completeness
Follow-up fixes for the context engine plugin slot (PR #5700):

- Enhance ContextEngine ABC: add threshold_percent, protect_first_n,
  protect_last_n as class attributes; complete update_model() default
  with threshold recalculation; clarify on_session_end() lifecycle docs
- Add ContextCompressor.update_model() override for model/provider/
  base_url/api_key updates
- Replace all direct compressor internal access in run_agent.py with
  ABC interface: switch_model(), fallback restore, context probing
  all use update_model() now; _context_probed guarded with getattr/
  hasattr for plugin engine compatibility
- Create plugins/context_engine/ directory with discovery module
  (mirrors plugins/memory/ pattern) — discover_context_engines(),
  load_context_engine()
- Add context.engine config key to DEFAULT_CONFIG (default: compressor)
- Config-driven engine selection in run_agent.__init__: checks config,
  then plugins/context_engine/<name>/, then general plugin system,
  falls back to built-in ContextCompressor
- Wire on_session_end() in shutdown_memory_provider() at real session
  boundaries (CLI exit, /reset, gateway expiry)
2026-04-08 04:16:58 -07:00
Stephen Schoettler
51751cdaf4 feat: wire context engine tools, session lifecycle, and tool dispatch
- Inject engine tool schemas into agent tool surface after compressor init
- Call on_session_start() with session_id, hermes_home, platform, model
- Dispatch engine tool calls (lcm_grep, etc.) before regular tool handler
- 55/55 tests pass
2026-04-08 04:05:14 -07:00
Stephen Schoettler
e7209789b9 feat: wire context engine plugin slot into agent and plugin system
- PluginContext.register_context_engine() lets plugins replace the
  built-in ContextCompressor with a custom ContextEngine implementation
- PluginManager stores the registered engine; only one allowed
- run_agent.py checks for a plugin engine at init before falling back
  to the default ContextCompressor
- reset_session_state() now calls engine.on_session_reset() instead of
  poking internal attributes directly
- ContextCompressor.on_session_reset() handles its own internals
  (_context_probed, _previous_summary, etc.)
- 19 new tests covering ABC contract, defaults, plugin slot registration,
  rejection of duplicates/non-engines, and compressor reset behavior
- All 34 existing compressor tests pass unchanged
2026-04-08 04:05:14 -07:00
Stephen Schoettler
ff95ec1c54 feat: add ContextEngine ABC, refactor ContextCompressor to inherit from it
Introduces agent/context_engine.py — an abstract base class that defines
the pluggable context engine interface. ContextCompressor now inherits
from ContextEngine as the default implementation.

No behavior change. All 34 existing compressor tests pass.

This is the foundation for a context engine plugin slot, enabling
third-party engines like LCM (Lossless Context Management) to replace
the built-in compressor via the plugin system.
2026-04-08 04:05:14 -07:00
7 changed files with 920 additions and 77 deletions

View File

@@ -18,6 +18,7 @@ import time
from typing import Any, Dict, List, Optional
from agent.auxiliary_client import call_llm
from agent.context_engine import ContextEngine
from agent.model_metadata import (
get_model_context_length,
estimate_messages_tokens_rough,
@@ -50,8 +51,8 @@ _CHARS_PER_TOKEN = 4
_SUMMARY_FAILURE_COOLDOWN_SECONDS = 600
class ContextCompressor:
"""Compresses conversation context when approaching the model's context limit.
class ContextCompressor(ContextEngine):
"""Default context engine — compresses conversation context via lossy summarization.
Algorithm:
1. Prune old tool results (cheap, no LLM call)
@@ -61,6 +62,33 @@ class ContextCompressor:
5. On subsequent compactions, iteratively update the previous summary
"""
@property
def name(self) -> str:
return "compressor"
def on_session_reset(self) -> None:
"""Reset all per-session state for /new or /reset."""
super().on_session_reset()
self._context_probed = False
self._context_probe_persistable = False
self._previous_summary = None
def update_model(
self,
model: str,
context_length: int,
base_url: str = "",
api_key: str = "",
provider: str = "",
) -> None:
"""Update model info after a model switch or fallback activation."""
self.model = model
self.base_url = base_url
self.api_key = api_key
self.provider = provider
self.context_length = context_length
self.threshold_tokens = int(context_length * self.threshold_percent)
def __init__(
self,
model: str,

184
agent/context_engine.py Normal file
View File

@@ -0,0 +1,184 @@
"""Abstract base class for pluggable context engines.
A context engine controls how conversation context is managed when
approaching the model's token limit. The built-in ContextCompressor
is the default implementation. Third-party engines (e.g. LCM) can
replace it via the plugin system or by being placed in the
``plugins/context_engine/<name>/`` directory.
Selection is config-driven: ``context.engine`` in config.yaml.
Default is ``"compressor"`` (the built-in). Only one engine is active.
The engine is responsible for:
- Deciding when compaction should fire
- Performing compaction (summarization, DAG construction, etc.)
- Optionally exposing tools the agent can call (e.g. lcm_grep)
- Tracking token usage from API responses
Lifecycle:
1. Engine is instantiated and registered (plugin register() or default)
2. on_session_start() called when a conversation begins
3. update_from_response() called after each API response with usage data
4. should_compress() checked after each turn
5. compress() called when should_compress() returns True
6. on_session_end() called at real session boundaries (CLI exit, /reset,
gateway session expiry) — NOT per-turn
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
class ContextEngine(ABC):
"""Base class all context engines must implement."""
# -- Identity ----------------------------------------------------------
@property
@abstractmethod
def name(self) -> str:
"""Short identifier (e.g. 'compressor', 'lcm')."""
# -- Token state (read by run_agent.py for display/logging) ------------
#
# Engines MUST maintain these. run_agent.py reads them directly.
last_prompt_tokens: int = 0
last_completion_tokens: int = 0
last_total_tokens: int = 0
threshold_tokens: int = 0
context_length: int = 0
compression_count: int = 0
# -- Compaction parameters (read by run_agent.py for preflight) --------
#
# These control the preflight compression check. Subclasses may
# override via __init__ or property; defaults are sensible for most
# engines.
threshold_percent: float = 0.75
protect_first_n: int = 3
protect_last_n: int = 6
# -- Core interface ----------------------------------------------------
@abstractmethod
def update_from_response(self, usage: Dict[str, Any]) -> None:
"""Update tracked token usage from an API response.
Called after every LLM call with the usage dict from the response.
"""
@abstractmethod
def should_compress(self, prompt_tokens: int = None) -> bool:
"""Return True if compaction should fire this turn."""
@abstractmethod
def compress(
self,
messages: List[Dict[str, Any]],
current_tokens: int = None,
) -> List[Dict[str, Any]]:
"""Compact the message list and return the new message list.
This is the main entry point. The engine receives the full message
list and returns a (possibly shorter) list that fits within the
context budget. The implementation is free to summarize, build a
DAG, or do anything else — as long as the returned list is a valid
OpenAI-format message sequence.
"""
# -- Optional: pre-flight check ----------------------------------------
def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool:
"""Quick rough check before the API call (no real token count yet).
Default returns False (skip pre-flight). Override if your engine
can do a cheap estimate.
"""
return False
# -- Optional: session lifecycle ---------------------------------------
def on_session_start(self, session_id: str, **kwargs) -> None:
"""Called when a new conversation session begins.
Use this to load persisted state (DAG, store) for the session.
kwargs may include hermes_home, platform, model, etc.
"""
def on_session_end(self, session_id: str, messages: List[Dict[str, Any]]) -> None:
"""Called at real session boundaries (CLI exit, /reset, gateway expiry).
Use this to flush state, close DB connections, etc.
NOT called per-turn — only when the session truly ends.
"""
def on_session_reset(self) -> None:
"""Called on /new or /reset. Reset per-session state.
Default resets compression_count and token tracking.
"""
self.last_prompt_tokens = 0
self.last_completion_tokens = 0
self.last_total_tokens = 0
self.compression_count = 0
# -- Optional: tools ---------------------------------------------------
def get_tool_schemas(self) -> List[Dict[str, Any]]:
"""Return tool schemas this engine provides to the agent.
Default returns empty list (no tools). LCM would return schemas
for lcm_grep, lcm_describe, lcm_expand here.
"""
return []
def handle_tool_call(self, name: str, args: Dict[str, Any], **kwargs) -> str:
"""Handle a tool call from the agent.
Only called for tool names returned by get_tool_schemas().
Must return a JSON string.
kwargs may include:
messages: the current in-memory message list (for live ingestion)
"""
import json
return json.dumps({"error": f"Unknown context engine tool: {name}"})
# -- Optional: status / display ----------------------------------------
def get_status(self) -> Dict[str, Any]:
"""Return status dict for display/logging.
Default returns the standard fields run_agent.py expects.
"""
return {
"last_prompt_tokens": self.last_prompt_tokens,
"threshold_tokens": self.threshold_tokens,
"context_length": self.context_length,
"usage_percent": (
min(100, self.last_prompt_tokens / self.context_length * 100)
if self.context_length else 0
),
"compression_count": self.compression_count,
}
# -- Optional: model switch support ------------------------------------
def update_model(
self,
model: str,
context_length: int,
base_url: str = "",
api_key: str = "",
provider: str = "",
) -> None:
"""Called when the user switches models or on fallback activation.
Default updates context_length and recalculates threshold_tokens
from threshold_percent. Override if your engine needs more
(e.g. recalculate DAG budgets, switch summary models).
"""
self.context_length = context_length
self.threshold_tokens = int(context_length * self.threshold_percent)

View File

@@ -437,6 +437,16 @@ DEFAULT_CONFIG = {
"max_ms": 2500,
},
# Context engine -- controls how the context window is managed when
# approaching the model's token limit.
# "compressor" = built-in lossy summarization (default).
# Set to a plugin name to activate an alternative engine (e.g. "lcm"
# for Lossless Context Management). The engine must be installed as
# a plugin in plugins/context_engine/<name>/ or ~/.hermes/plugins/.
"context": {
"engine": "compressor",
},
# Persistent memory -- bounded curated memory injected into system prompt
"memory": {
"memory_enabled": True,
@@ -1338,7 +1348,7 @@ _KNOWN_ROOT_KEYS = {
"_config_version", "model", "providers", "fallback_model",
"fallback_providers", "credential_pool_strategies", "toolsets",
"agent", "terminal", "display", "compression", "delegation",
"auxiliary", "custom_providers", "memory", "gateway",
"auxiliary", "custom_providers", "context", "memory", "gateway",
}
# Valid fields inside a custom_providers list entry

View File

@@ -199,8 +199,7 @@ class PluginContext:
The *setup_fn* receives an argparse subparser and should add any
arguments/sub-subparsers. If *handler_fn* is provided it is set
as the default dispatch function via ``set_defaults(func=...)``.
"""
as the default dispatch function via ``set_defaults(func=...)``."""
self._manager._cli_commands[name] = {
"name": name,
"help": help,
@@ -211,6 +210,38 @@ class PluginContext:
}
logger.debug("Plugin %s registered CLI command: %s", self.manifest.name, name)
# -- context engine registration -----------------------------------------
def register_context_engine(self, engine) -> None:
"""Register a context engine to replace the built-in ContextCompressor.
Only one context engine plugin is allowed. If a second plugin tries
to register one, it is rejected with a warning.
The engine must be an instance of ``agent.context_engine.ContextEngine``.
"""
if self._manager._context_engine is not None:
logger.warning(
"Plugin '%s' tried to register a context engine, but one is "
"already registered. Only one context engine plugin is allowed.",
self.manifest.name,
)
return
# Defer the import to avoid circular deps at module level
from agent.context_engine import ContextEngine
if not isinstance(engine, ContextEngine):
logger.warning(
"Plugin '%s' tried to register a context engine that does not "
"inherit from ContextEngine. Ignoring.",
self.manifest.name,
)
return
self._manager._context_engine = engine
logger.info(
"Plugin '%s' registered context engine: %s",
self.manifest.name, engine.name,
)
# -- hook registration --------------------------------------------------
def register_hook(self, hook_name: str, callback: Callable) -> None:
@@ -243,6 +274,7 @@ class PluginManager:
self._hooks: Dict[str, List[Callable]] = {}
self._plugin_tool_names: Set[str] = set()
self._cli_commands: Dict[str, dict] = {}
self._context_engine = None # Set by a plugin via register_context_engine()
self._discovered: bool = False
self._cli_ref = None # Set by CLI after plugin discovery
@@ -564,6 +596,11 @@ def get_plugin_cli_commands() -> Dict[str, dict]:
return dict(get_plugin_manager()._cli_commands)
def get_plugin_context_engine():
"""Return the plugin-registered context engine, or None."""
return get_plugin_manager()._context_engine
def get_plugin_toolsets() -> List[tuple]:
"""Return plugin toolsets as ``(key, label, description)`` tuples.

View File

@@ -0,0 +1,219 @@
"""Context engine plugin discovery.
Scans ``plugins/context_engine/<name>/`` directories for context engine
plugins. Each subdirectory must contain ``__init__.py`` with a class
implementing the ContextEngine ABC.
Context engines are separate from the general plugin system — they live
in the repo and are always available without user installation. Only ONE
can be active at a time, selected via ``context.engine`` in config.yaml.
The default engine is ``"compressor"`` (the built-in ContextCompressor).
Usage:
from plugins.context_engine import discover_context_engines, load_context_engine
available = discover_context_engines() # [(name, desc, available), ...]
engine = load_context_engine("lcm") # ContextEngine instance
"""
from __future__ import annotations
import importlib
import importlib.util
import logging
import sys
from pathlib import Path
from typing import List, Optional, Tuple
logger = logging.getLogger(__name__)
_CONTEXT_ENGINE_PLUGINS_DIR = Path(__file__).parent
def discover_context_engines() -> List[Tuple[str, str, bool]]:
"""Scan plugins/context_engine/ for available engines.
Returns list of (name, description, is_available) tuples.
Does NOT import the engines — just reads plugin.yaml for metadata
and does a lightweight availability check.
"""
results = []
if not _CONTEXT_ENGINE_PLUGINS_DIR.is_dir():
return results
for child in sorted(_CONTEXT_ENGINE_PLUGINS_DIR.iterdir()):
if not child.is_dir() or child.name.startswith(("_", ".")):
continue
init_file = child / "__init__.py"
if not init_file.exists():
continue
# Read description from plugin.yaml if available
desc = ""
yaml_file = child / "plugin.yaml"
if yaml_file.exists():
try:
import yaml
with open(yaml_file) as f:
meta = yaml.safe_load(f) or {}
desc = meta.get("description", "")
except Exception:
pass
# Quick availability check — try loading and calling is_available()
available = True
try:
engine = _load_engine_from_dir(child)
if engine is None:
available = False
elif hasattr(engine, "is_available"):
available = engine.is_available()
except Exception:
available = False
results.append((child.name, desc, available))
return results
def load_context_engine(name: str) -> Optional["ContextEngine"]:
"""Load and return a ContextEngine instance by name.
Returns None if the engine is not found or fails to load.
"""
engine_dir = _CONTEXT_ENGINE_PLUGINS_DIR / name
if not engine_dir.is_dir():
logger.debug("Context engine '%s' not found in %s", name, _CONTEXT_ENGINE_PLUGINS_DIR)
return None
try:
engine = _load_engine_from_dir(engine_dir)
if engine:
return engine
logger.warning("Context engine '%s' loaded but no engine instance found", name)
return None
except Exception as e:
logger.warning("Failed to load context engine '%s': %s", name, e)
return None
def _load_engine_from_dir(engine_dir: Path) -> Optional["ContextEngine"]:
"""Import an engine module and extract the ContextEngine instance.
The module must have either:
- A register(ctx) function (plugin-style) — we simulate a ctx
- A top-level class that extends ContextEngine — we instantiate it
"""
name = engine_dir.name
module_name = f"plugins.context_engine.{name}"
init_file = engine_dir / "__init__.py"
if not init_file.exists():
return None
# Check if already loaded
if module_name in sys.modules:
mod = sys.modules[module_name]
else:
# Handle relative imports within the plugin
# First ensure the parent packages are registered
for parent in ("plugins", "plugins.context_engine"):
if parent not in sys.modules:
parent_path = Path(__file__).parent
if parent == "plugins":
parent_path = parent_path.parent
parent_init = parent_path / "__init__.py"
if parent_init.exists():
spec = importlib.util.spec_from_file_location(
parent, str(parent_init),
submodule_search_locations=[str(parent_path)]
)
if spec:
parent_mod = importlib.util.module_from_spec(spec)
sys.modules[parent] = parent_mod
try:
spec.loader.exec_module(parent_mod)
except Exception:
pass
# Now load the engine module
spec = importlib.util.spec_from_file_location(
module_name, str(init_file),
submodule_search_locations=[str(engine_dir)]
)
if not spec:
return None
mod = importlib.util.module_from_spec(spec)
sys.modules[module_name] = mod
# Register submodules so relative imports work
for sub_file in engine_dir.glob("*.py"):
if sub_file.name == "__init__.py":
continue
sub_name = sub_file.stem
full_sub_name = f"{module_name}.{sub_name}"
if full_sub_name not in sys.modules:
sub_spec = importlib.util.spec_from_file_location(
full_sub_name, str(sub_file)
)
if sub_spec:
sub_mod = importlib.util.module_from_spec(sub_spec)
sys.modules[full_sub_name] = sub_mod
try:
sub_spec.loader.exec_module(sub_mod)
except Exception as e:
logger.debug("Failed to load submodule %s: %s", full_sub_name, e)
try:
spec.loader.exec_module(mod)
except Exception as e:
logger.debug("Failed to exec_module %s: %s", module_name, e)
sys.modules.pop(module_name, None)
return None
# Try register(ctx) pattern first (how plugins are written)
if hasattr(mod, "register"):
collector = _EngineCollector()
try:
mod.register(collector)
if collector.engine:
return collector.engine
except Exception as e:
logger.debug("register() failed for %s: %s", name, e)
# Fallback: find a ContextEngine subclass and instantiate it
from agent.context_engine import ContextEngine
for attr_name in dir(mod):
attr = getattr(mod, attr_name, None)
if (isinstance(attr, type) and issubclass(attr, ContextEngine)
and attr is not ContextEngine):
try:
return attr()
except Exception:
pass
return None
class _EngineCollector:
"""Fake plugin context that captures register_context_engine calls."""
def __init__(self):
self.engine = None
def register_context_engine(self, engine):
self.engine = engine
# No-op for other registration methods
def register_tool(self, *args, **kwargs):
pass
def register_hook(self, *args, **kwargs):
pass
def register_cli_command(self, *args, **kwargs):
pass
def register_memory_provider(self, *args, **kwargs):
pass

View File

@@ -1129,20 +1129,94 @@ class AIAgent:
pass
break
self.context_compressor = ContextCompressor(
model=self.model,
threshold_percent=compression_threshold,
protect_first_n=3,
protect_last_n=compression_protect_last,
summary_target_ratio=compression_target_ratio,
summary_model_override=compression_summary_model,
quiet_mode=self.quiet_mode,
base_url=self.base_url,
api_key=getattr(self, "api_key", ""),
config_context_length=_config_context_length,
provider=self.provider,
)
# Select context engine: config-driven (like memory providers).
# 1. Check config.yaml context.engine setting
# 2. Check plugins/context_engine/<name>/ directory (repo-shipped)
# 3. Check general plugin system (user-installed plugins)
# 4. Fall back to built-in ContextCompressor
_selected_engine = None
_engine_name = "compressor" # default
try:
_ctx_cfg = _agent_cfg.get("context", {}) if isinstance(_agent_cfg, dict) else {}
_engine_name = _ctx_cfg.get("engine", "compressor") or "compressor"
except Exception:
pass
if _engine_name != "compressor":
# Try loading from plugins/context_engine/<name>/
try:
from plugins.context_engine import load_context_engine
_selected_engine = load_context_engine(_engine_name)
except Exception as _ce_load_err:
logger.debug("Context engine load from plugins/context_engine/: %s", _ce_load_err)
# Try general plugin system as fallback
if _selected_engine is None:
try:
from hermes_cli.plugins import get_plugin_context_engine
_candidate = get_plugin_context_engine()
if _candidate and _candidate.name == _engine_name:
_selected_engine = _candidate
except Exception:
pass
if _selected_engine is None:
logger.warning(
"Context engine '%s' not found — falling back to built-in compressor",
_engine_name,
)
else:
# Even with default config, check if a plugin registered one
try:
from hermes_cli.plugins import get_plugin_context_engine
_selected_engine = get_plugin_context_engine()
except Exception:
pass
if _selected_engine is not None:
self.context_compressor = _selected_engine
if not self.quiet_mode:
logger.info("Using context engine: %s", _selected_engine.name)
else:
self.context_compressor = ContextCompressor(
model=self.model,
threshold_percent=compression_threshold,
protect_first_n=3,
protect_last_n=compression_protect_last,
summary_target_ratio=compression_target_ratio,
summary_model_override=compression_summary_model,
quiet_mode=self.quiet_mode,
base_url=self.base_url,
api_key=getattr(self, "api_key", ""),
config_context_length=_config_context_length,
provider=self.provider,
)
self.compression_enabled = compression_enabled
# Inject context engine tool schemas (e.g. lcm_grep, lcm_describe, lcm_expand)
self._context_engine_tool_names: set = set()
if hasattr(self, "context_compressor") and self.context_compressor and self.tools is not None:
for _schema in self.context_compressor.get_tool_schemas():
_wrapped = {"type": "function", "function": _schema}
self.tools.append(_wrapped)
_tname = _schema.get("name", "")
if _tname:
self.valid_tool_names.add(_tname)
self._context_engine_tool_names.add(_tname)
# Notify context engine of session start
if hasattr(self, "context_compressor") and self.context_compressor:
try:
self.context_compressor.on_session_start(
self.session_id,
hermes_home=str(get_hermes_home()),
platform=self.platform or "cli",
model=self.model,
context_length=getattr(self.context_compressor, "context_length", 0),
)
except Exception as _ce_err:
logger.debug("Context engine on_session_start: %s", _ce_err)
self._subdirectory_hints = SubdirectoryHintTracker(
working_dir=os.getenv("TERMINAL_CWD") or None,
)
@@ -1208,11 +1282,13 @@ class AIAgent:
"api_key": getattr(self, "api_key", ""),
"client_kwargs": dict(self._client_kwargs),
"use_prompt_caching": self._use_prompt_caching,
# Compressor state that _try_activate_fallback() overwrites
"compressor_model": _cc.model,
"compressor_base_url": _cc.base_url,
# Context engine state that _try_activate_fallback() overwrites.
# Use getattr for model/base_url/api_key/provider since plugin
# engines may not have these (they're ContextCompressor-specific).
"compressor_model": getattr(_cc, "model", self.model),
"compressor_base_url": getattr(_cc, "base_url", self.base_url),
"compressor_api_key": getattr(_cc, "api_key", ""),
"compressor_provider": _cc.provider,
"compressor_provider": getattr(_cc, "provider", self.provider),
"compressor_context_length": _cc.context_length,
"compressor_threshold_tokens": _cc.threshold_tokens,
}
@@ -1258,16 +1334,9 @@ class AIAgent:
# Turn counter (added after reset_session_state was first written — #2635)
self._user_turn_count = 0
# Context compressor internal counters (if present)
# Context engine reset (works for both built-in compressor and plugins)
if hasattr(self, "context_compressor") and self.context_compressor:
self.context_compressor.last_prompt_tokens = 0
self.context_compressor.last_completion_tokens = 0
self.context_compressor.last_total_tokens = 0
self.context_compressor.compression_count = 0
self.context_compressor._context_probed = False
self.context_compressor._context_probe_persistable = False
# Iterative summary from previous session must not bleed into new one (#2635)
self.context_compressor._previous_summary = None
self.context_compressor.on_session_reset()
def switch_model(self, new_model, new_provider, api_key='', base_url='', api_mode=''):
"""Switch the model/provider in-place for a live agent.
@@ -1347,13 +1416,12 @@ class AIAgent:
api_key=self.api_key,
provider=self.provider,
)
self.context_compressor.model = self.model
self.context_compressor.base_url = self.base_url
self.context_compressor.api_key = self.api_key
self.context_compressor.provider = self.provider
self.context_compressor.context_length = new_context_length
self.context_compressor.threshold_tokens = int(
new_context_length * self.context_compressor.threshold_percent
self.context_compressor.update_model(
model=self.model,
context_length=new_context_length,
base_url=self.base_url,
api_key=getattr(self, "api_key", ""),
provider=self.provider,
)
# ── Invalidate cached system prompt so it rebuilds next turn ──
@@ -1369,10 +1437,10 @@ class AIAgent:
"api_key": getattr(self, "api_key", ""),
"client_kwargs": dict(self._client_kwargs),
"use_prompt_caching": self._use_prompt_caching,
"compressor_model": _cc.model if _cc else self.model,
"compressor_base_url": _cc.base_url if _cc else self.base_url,
"compressor_model": getattr(_cc, "model", self.model) if _cc else self.model,
"compressor_base_url": getattr(_cc, "base_url", self.base_url) if _cc else self.base_url,
"compressor_api_key": getattr(_cc, "api_key", "") if _cc else "",
"compressor_provider": _cc.provider if _cc else self.provider,
"compressor_provider": getattr(_cc, "provider", self.provider) if _cc else self.provider,
"compressor_context_length": _cc.context_length if _cc else 0,
"compressor_threshold_tokens": _cc.threshold_tokens if _cc else 0,
}
@@ -2517,10 +2585,11 @@ class AIAgent:
}
def shutdown_memory_provider(self, messages: list = None) -> None:
"""Shut down the memory provider — call at actual session boundaries.
"""Shut down the memory provider and context engine — call at actual session boundaries.
This calls on_session_end() then shutdown_all() on the memory
manager. NOT called per-turn — only at CLI exit, /reset, gateway
manager, and on_session_end() on the context engine.
NOT called per-turn — only at CLI exit, /reset, gateway
session expiry, etc.
"""
if self._memory_manager:
@@ -2532,6 +2601,15 @@ class AIAgent:
self._memory_manager.shutdown_all()
except Exception:
pass
# Notify context engine of session end (flush DAG, close DBs, etc.)
if hasattr(self, "context_compressor") and self.context_compressor:
try:
self.context_compressor.on_session_end(
self.session_id or "",
messages or [],
)
except Exception:
pass
def _hydrate_todo_store(self, history: List[Dict[str, Any]]) -> None:
"""
@@ -4901,13 +4979,12 @@ class AIAgent:
self.model, base_url=self.base_url,
api_key=self.api_key, provider=self.provider,
)
self.context_compressor.model = self.model
self.context_compressor.base_url = self.base_url
self.context_compressor.api_key = self.api_key
self.context_compressor.provider = self.provider
self.context_compressor.context_length = fb_context_length
self.context_compressor.threshold_tokens = int(
fb_context_length * self.context_compressor.threshold_percent
self.context_compressor.update_model(
model=self.model,
context_length=fb_context_length,
base_url=self.base_url,
api_key=getattr(self, "api_key", ""),
provider=self.provider,
)
self._emit_status(
@@ -4967,14 +5044,15 @@ class AIAgent:
shared=True,
)
# ── Restore context compressor state ──
# ── Restore context engine state ──
cc = self.context_compressor
cc.model = rt["compressor_model"]
cc.base_url = rt["compressor_base_url"]
cc.api_key = rt["compressor_api_key"]
cc.provider = rt["compressor_provider"]
cc.context_length = rt["compressor_context_length"]
cc.threshold_tokens = rt["compressor_threshold_tokens"]
cc.update_model(
model=rt["compressor_model"],
context_length=rt["compressor_context_length"],
base_url=rt["compressor_base_url"],
api_key=rt["compressor_api_key"],
provider=rt["compressor_provider"],
)
# ── Reset fallback chain for the new turn ──
self._fallback_activated = False
@@ -6416,6 +6494,29 @@ class AIAgent:
spinner.stop(cute_msg)
elif self.quiet_mode:
self._vprint(f" {cute_msg}")
elif self._context_engine_tool_names and function_name in self._context_engine_tool_names:
# Context engine tools (lcm_grep, lcm_describe, lcm_expand, etc.)
spinner = None
if self.quiet_mode and not self.tool_progress_callback:
face = random.choice(KawaiiSpinner.KAWAII_WAITING)
emoji = _get_tool_emoji(function_name)
preview = _build_tool_preview(function_name, function_args) or function_name
spinner = KawaiiSpinner(f"{face} {emoji} {preview}", spinner_type='dots', print_fn=self._print_fn)
spinner.start()
_ce_result = None
try:
function_result = self.context_compressor.handle_tool_call(function_name, function_args, messages=messages)
_ce_result = function_result
except Exception as tool_error:
function_result = json.dumps({"error": f"Context engine tool '{function_name}' failed: {tool_error}"})
logger.error("context_engine.handle_tool_call raised for %s: %s", function_name, tool_error, exc_info=True)
finally:
tool_duration = time.time() - tool_start_time
cute_msg = _get_cute_tool_message_impl(function_name, function_args, tool_duration, result=_ce_result)
if spinner:
spinner.stop(cute_msg)
elif self.quiet_mode:
self._vprint(f" {cute_msg}")
elif self._memory_manager and self._memory_manager.has_tool(function_name):
# Memory provider tools (hindsight_retain, honcho_search, etc.)
# These are not in the tool registry — route through MemoryManager.
@@ -7693,7 +7794,7 @@ class AIAgent:
# Cache discovered context length after successful call.
# Only persist limits confirmed by the provider (parsed
# from the error message), not guessed probe tiers.
if self.context_compressor._context_probed:
if getattr(self.context_compressor, "_context_probed", False):
ctx = self.context_compressor.context_length
if getattr(self.context_compressor, "_context_probe_persistable", False):
save_context_length(self.model, self.base_url, ctx)
@@ -7978,16 +8079,22 @@ class AIAgent:
compressor = self.context_compressor
old_ctx = compressor.context_length
if old_ctx > _reduced_ctx:
compressor.context_length = _reduced_ctx
compressor.threshold_tokens = int(
_reduced_ctx * compressor.threshold_percent
compressor.update_model(
model=self.model,
context_length=_reduced_ctx,
base_url=self.base_url,
api_key=getattr(self, "api_key", ""),
provider=self.provider,
)
compressor._context_probed = True
# Don't persist — this is a subscription-tier
# limitation, not a model capability. If the user
# later enables extra usage the 1M limit should
# come back automatically.
compressor._context_probe_persistable = False
# Context probing flags — only set on built-in
# compressor (plugin engines manage their own).
if hasattr(compressor, "_context_probed"):
compressor._context_probed = True
# Don't persist — this is a subscription-tier
# limitation, not a model capability. If the
# user later enables extra usage the 1M limit
# should come back automatically.
compressor._context_probe_persistable = False
self._vprint(
f"{self.log_prefix}⚠️ Anthropic long-context tier "
f"requires extra usage — reducing context: "
@@ -8160,17 +8267,25 @@ class AIAgent:
new_ctx = get_next_probe_tier(old_ctx)
if new_ctx and new_ctx < old_ctx:
compressor.context_length = new_ctx
compressor.threshold_tokens = int(new_ctx * compressor.threshold_percent)
compressor._context_probed = True
# Only persist limits parsed from the provider's
# error message (a real number). Guessed fallback
# tiers from get_next_probe_tier() should stay
# in-memory only — persisting them pollutes the
# cache with wrong values.
compressor._context_probe_persistable = bool(
parsed_limit and parsed_limit == new_ctx
compressor.update_model(
model=self.model,
context_length=new_ctx,
base_url=self.base_url,
api_key=getattr(self, "api_key", ""),
provider=self.provider,
)
# Context probing flags — only set on built-in
# compressor (plugin engines manage their own).
if hasattr(compressor, "_context_probed"):
compressor._context_probed = True
# Only persist limits parsed from the provider's
# error message (a real number). Guessed fallback
# tiers from get_next_probe_tier() should stay
# in-memory only — persisting them pollutes the
# cache with wrong values.
compressor._context_probe_persistable = bool(
parsed_limit and parsed_limit == new_ctx
)
self._vprint(f"{self.log_prefix}⚠️ Context length exceeded — stepping down: {old_ctx:,}{new_ctx:,} tokens", force=True)
else:
self._vprint(f"{self.log_prefix}⚠️ Context length exceeded at minimum tier — attempting compression...", force=True)

View File

@@ -0,0 +1,250 @@
"""Tests for the ContextEngine ABC and plugin slot."""
import json
import pytest
from typing import Any, Dict, List
from agent.context_engine import ContextEngine
from agent.context_compressor import ContextCompressor
# ---------------------------------------------------------------------------
# A minimal concrete engine for testing the ABC
# ---------------------------------------------------------------------------
class StubEngine(ContextEngine):
"""Minimal engine that satisfies the ABC without doing real work."""
def __init__(self, context_length=200000, threshold_pct=0.50):
self.context_length = context_length
self.threshold_tokens = int(context_length * threshold_pct)
self._compress_called = False
self._tools_called = []
@property
def name(self) -> str:
return "stub"
def update_from_response(self, usage: Dict[str, Any]) -> None:
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
self.last_completion_tokens = usage.get("completion_tokens", 0)
self.last_total_tokens = usage.get("total_tokens", 0)
def should_compress(self, prompt_tokens: int = None) -> bool:
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
return tokens >= self.threshold_tokens
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
self._compress_called = True
self.compression_count += 1
# Trivial: just return as-is
return messages
def get_tool_schemas(self) -> List[Dict[str, Any]]:
return [
{
"name": "stub_search",
"description": "Search the stub engine",
"parameters": {"type": "object", "properties": {}},
}
]
def handle_tool_call(self, name: str, args: Dict[str, Any]) -> str:
self._tools_called.append(name)
return json.dumps({"ok": True, "tool": name})
# ---------------------------------------------------------------------------
# ABC contract tests
# ---------------------------------------------------------------------------
class TestContextEngineABC:
"""Verify the ABC enforces the required interface."""
def test_cannot_instantiate_abc_directly(self):
with pytest.raises(TypeError):
ContextEngine()
def test_missing_methods_raises(self):
"""A subclass missing required methods cannot be instantiated."""
class Incomplete(ContextEngine):
@property
def name(self):
return "incomplete"
with pytest.raises(TypeError):
Incomplete()
def test_stub_engine_satisfies_abc(self):
engine = StubEngine()
assert isinstance(engine, ContextEngine)
assert engine.name == "stub"
def test_compressor_is_context_engine(self):
c = ContextCompressor(model="test", quiet_mode=True, config_context_length=200000)
assert isinstance(c, ContextEngine)
assert c.name == "compressor"
# ---------------------------------------------------------------------------
# Default method behavior
# ---------------------------------------------------------------------------
class TestDefaults:
"""Verify ABC default implementations work correctly."""
def test_default_tool_schemas_empty(self):
engine = StubEngine()
# StubEngine overrides this, so test the base via super
assert ContextEngine.get_tool_schemas(engine) == []
def test_default_handle_tool_call_returns_error(self):
engine = StubEngine()
result = ContextEngine.handle_tool_call(engine, "unknown", {})
data = json.loads(result)
assert "error" in data
def test_default_get_status(self):
engine = StubEngine()
engine.last_prompt_tokens = 50000
status = engine.get_status()
assert status["last_prompt_tokens"] == 50000
assert status["context_length"] == 200000
assert status["threshold_tokens"] == 100000
assert 0 < status["usage_percent"] <= 100
def test_on_session_reset(self):
engine = StubEngine()
engine.last_prompt_tokens = 999
engine.compression_count = 3
engine.on_session_reset()
assert engine.last_prompt_tokens == 0
assert engine.compression_count == 0
def test_should_compress_preflight_default_false(self):
engine = StubEngine()
assert engine.should_compress_preflight([]) is False
# ---------------------------------------------------------------------------
# StubEngine behavior
# ---------------------------------------------------------------------------
class TestStubEngine:
def test_should_compress(self):
engine = StubEngine(context_length=100000, threshold_pct=0.50)
assert not engine.should_compress(40000)
assert engine.should_compress(50000)
assert engine.should_compress(60000)
def test_compress_tracks_count(self):
engine = StubEngine()
msgs = [{"role": "user", "content": "hello"}]
result = engine.compress(msgs)
assert result == msgs
assert engine._compress_called
assert engine.compression_count == 1
def test_tool_schemas(self):
engine = StubEngine()
schemas = engine.get_tool_schemas()
assert len(schemas) == 1
assert schemas[0]["name"] == "stub_search"
def test_handle_tool_call(self):
engine = StubEngine()
result = engine.handle_tool_call("stub_search", {})
assert json.loads(result)["ok"] is True
assert "stub_search" in engine._tools_called
def test_update_from_response(self):
engine = StubEngine()
engine.update_from_response({"prompt_tokens": 1000, "completion_tokens": 200, "total_tokens": 1200})
assert engine.last_prompt_tokens == 1000
assert engine.last_completion_tokens == 200
# ---------------------------------------------------------------------------
# ContextCompressor session reset via ABC
# ---------------------------------------------------------------------------
class TestCompressorSessionReset:
"""Verify ContextCompressor.on_session_reset() clears all state."""
def test_reset_clears_state(self):
c = ContextCompressor(model="test", quiet_mode=True, config_context_length=200000)
c.last_prompt_tokens = 50000
c.compression_count = 3
c._previous_summary = "some old summary"
c._context_probed = True
c._context_probe_persistable = True
c.on_session_reset()
assert c.last_prompt_tokens == 0
assert c.last_completion_tokens == 0
assert c.last_total_tokens == 0
assert c.compression_count == 0
assert c._context_probed is False
assert c._context_probe_persistable is False
assert c._previous_summary is None
# ---------------------------------------------------------------------------
# Plugin slot (PluginManager integration)
# ---------------------------------------------------------------------------
class TestPluginContextEngineSlot:
"""Test register_context_engine on PluginContext."""
def test_register_engine(self):
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
mgr = PluginManager()
manifest = PluginManifest(name="test-lcm")
ctx = PluginContext(manifest, mgr)
engine = StubEngine()
ctx.register_context_engine(engine)
assert mgr._context_engine is engine
assert mgr._context_engine.name == "stub"
def test_reject_second_engine(self):
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
mgr = PluginManager()
manifest = PluginManifest(name="test-lcm")
ctx = PluginContext(manifest, mgr)
engine1 = StubEngine()
engine2 = StubEngine()
ctx.register_context_engine(engine1)
ctx.register_context_engine(engine2) # should be rejected
assert mgr._context_engine is engine1
def test_reject_non_engine(self):
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
mgr = PluginManager()
manifest = PluginManifest(name="test-bad")
ctx = PluginContext(manifest, mgr)
ctx.register_context_engine("not an engine")
assert mgr._context_engine is None
def test_get_plugin_context_engine(self):
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest, get_plugin_context_engine, _plugin_manager
import hermes_cli.plugins as plugins_mod
# Inject a test manager
old_mgr = plugins_mod._plugin_manager
try:
mgr = PluginManager()
plugins_mod._plugin_manager = mgr
assert get_plugin_context_engine() is None
engine = StubEngine()
mgr._context_engine = engine
assert get_plugin_context_engine() is engine
finally:
plugins_mod._plugin_manager = old_mgr