mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 07:21:37 +08:00
Compare commits
4 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0d57bcde1 | ||
|
|
51751cdaf4 | ||
|
|
e7209789b9 | ||
|
|
ff95ec1c54 |
@@ -18,6 +18,7 @@ import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from agent.auxiliary_client import call_llm
|
||||
from agent.context_engine import ContextEngine
|
||||
from agent.model_metadata import (
|
||||
get_model_context_length,
|
||||
estimate_messages_tokens_rough,
|
||||
@@ -50,8 +51,8 @@ _CHARS_PER_TOKEN = 4
|
||||
_SUMMARY_FAILURE_COOLDOWN_SECONDS = 600
|
||||
|
||||
|
||||
class ContextCompressor:
|
||||
"""Compresses conversation context when approaching the model's context limit.
|
||||
class ContextCompressor(ContextEngine):
|
||||
"""Default context engine — compresses conversation context via lossy summarization.
|
||||
|
||||
Algorithm:
|
||||
1. Prune old tool results (cheap, no LLM call)
|
||||
@@ -61,6 +62,33 @@ class ContextCompressor:
|
||||
5. On subsequent compactions, iteratively update the previous summary
|
||||
"""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "compressor"
|
||||
|
||||
def on_session_reset(self) -> None:
|
||||
"""Reset all per-session state for /new or /reset."""
|
||||
super().on_session_reset()
|
||||
self._context_probed = False
|
||||
self._context_probe_persistable = False
|
||||
self._previous_summary = None
|
||||
|
||||
def update_model(
|
||||
self,
|
||||
model: str,
|
||||
context_length: int,
|
||||
base_url: str = "",
|
||||
api_key: str = "",
|
||||
provider: str = "",
|
||||
) -> None:
|
||||
"""Update model info after a model switch or fallback activation."""
|
||||
self.model = model
|
||||
self.base_url = base_url
|
||||
self.api_key = api_key
|
||||
self.provider = provider
|
||||
self.context_length = context_length
|
||||
self.threshold_tokens = int(context_length * self.threshold_percent)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model: str,
|
||||
|
||||
184
agent/context_engine.py
Normal file
184
agent/context_engine.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Abstract base class for pluggable context engines.
|
||||
|
||||
A context engine controls how conversation context is managed when
|
||||
approaching the model's token limit. The built-in ContextCompressor
|
||||
is the default implementation. Third-party engines (e.g. LCM) can
|
||||
replace it via the plugin system or by being placed in the
|
||||
``plugins/context_engine/<name>/`` directory.
|
||||
|
||||
Selection is config-driven: ``context.engine`` in config.yaml.
|
||||
Default is ``"compressor"`` (the built-in). Only one engine is active.
|
||||
|
||||
The engine is responsible for:
|
||||
- Deciding when compaction should fire
|
||||
- Performing compaction (summarization, DAG construction, etc.)
|
||||
- Optionally exposing tools the agent can call (e.g. lcm_grep)
|
||||
- Tracking token usage from API responses
|
||||
|
||||
Lifecycle:
|
||||
1. Engine is instantiated and registered (plugin register() or default)
|
||||
2. on_session_start() called when a conversation begins
|
||||
3. update_from_response() called after each API response with usage data
|
||||
4. should_compress() checked after each turn
|
||||
5. compress() called when should_compress() returns True
|
||||
6. on_session_end() called at real session boundaries (CLI exit, /reset,
|
||||
gateway session expiry) — NOT per-turn
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
class ContextEngine(ABC):
|
||||
"""Base class all context engines must implement."""
|
||||
|
||||
# -- Identity ----------------------------------------------------------
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def name(self) -> str:
|
||||
"""Short identifier (e.g. 'compressor', 'lcm')."""
|
||||
|
||||
# -- Token state (read by run_agent.py for display/logging) ------------
|
||||
#
|
||||
# Engines MUST maintain these. run_agent.py reads them directly.
|
||||
|
||||
last_prompt_tokens: int = 0
|
||||
last_completion_tokens: int = 0
|
||||
last_total_tokens: int = 0
|
||||
threshold_tokens: int = 0
|
||||
context_length: int = 0
|
||||
compression_count: int = 0
|
||||
|
||||
# -- Compaction parameters (read by run_agent.py for preflight) --------
|
||||
#
|
||||
# These control the preflight compression check. Subclasses may
|
||||
# override via __init__ or property; defaults are sensible for most
|
||||
# engines.
|
||||
|
||||
threshold_percent: float = 0.75
|
||||
protect_first_n: int = 3
|
||||
protect_last_n: int = 6
|
||||
|
||||
# -- Core interface ----------------------------------------------------
|
||||
|
||||
@abstractmethod
|
||||
def update_from_response(self, usage: Dict[str, Any]) -> None:
|
||||
"""Update tracked token usage from an API response.
|
||||
|
||||
Called after every LLM call with the usage dict from the response.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def should_compress(self, prompt_tokens: int = None) -> bool:
|
||||
"""Return True if compaction should fire this turn."""
|
||||
|
||||
@abstractmethod
|
||||
def compress(
|
||||
self,
|
||||
messages: List[Dict[str, Any]],
|
||||
current_tokens: int = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Compact the message list and return the new message list.
|
||||
|
||||
This is the main entry point. The engine receives the full message
|
||||
list and returns a (possibly shorter) list that fits within the
|
||||
context budget. The implementation is free to summarize, build a
|
||||
DAG, or do anything else — as long as the returned list is a valid
|
||||
OpenAI-format message sequence.
|
||||
"""
|
||||
|
||||
# -- Optional: pre-flight check ----------------------------------------
|
||||
|
||||
def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool:
|
||||
"""Quick rough check before the API call (no real token count yet).
|
||||
|
||||
Default returns False (skip pre-flight). Override if your engine
|
||||
can do a cheap estimate.
|
||||
"""
|
||||
return False
|
||||
|
||||
# -- Optional: session lifecycle ---------------------------------------
|
||||
|
||||
def on_session_start(self, session_id: str, **kwargs) -> None:
|
||||
"""Called when a new conversation session begins.
|
||||
|
||||
Use this to load persisted state (DAG, store) for the session.
|
||||
kwargs may include hermes_home, platform, model, etc.
|
||||
"""
|
||||
|
||||
def on_session_end(self, session_id: str, messages: List[Dict[str, Any]]) -> None:
|
||||
"""Called at real session boundaries (CLI exit, /reset, gateway expiry).
|
||||
|
||||
Use this to flush state, close DB connections, etc.
|
||||
NOT called per-turn — only when the session truly ends.
|
||||
"""
|
||||
|
||||
def on_session_reset(self) -> None:
|
||||
"""Called on /new or /reset. Reset per-session state.
|
||||
|
||||
Default resets compression_count and token tracking.
|
||||
"""
|
||||
self.last_prompt_tokens = 0
|
||||
self.last_completion_tokens = 0
|
||||
self.last_total_tokens = 0
|
||||
self.compression_count = 0
|
||||
|
||||
# -- Optional: tools ---------------------------------------------------
|
||||
|
||||
def get_tool_schemas(self) -> List[Dict[str, Any]]:
|
||||
"""Return tool schemas this engine provides to the agent.
|
||||
|
||||
Default returns empty list (no tools). LCM would return schemas
|
||||
for lcm_grep, lcm_describe, lcm_expand here.
|
||||
"""
|
||||
return []
|
||||
|
||||
def handle_tool_call(self, name: str, args: Dict[str, Any], **kwargs) -> str:
|
||||
"""Handle a tool call from the agent.
|
||||
|
||||
Only called for tool names returned by get_tool_schemas().
|
||||
Must return a JSON string.
|
||||
|
||||
kwargs may include:
|
||||
messages: the current in-memory message list (for live ingestion)
|
||||
"""
|
||||
import json
|
||||
return json.dumps({"error": f"Unknown context engine tool: {name}"})
|
||||
|
||||
# -- Optional: status / display ----------------------------------------
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""Return status dict for display/logging.
|
||||
|
||||
Default returns the standard fields run_agent.py expects.
|
||||
"""
|
||||
return {
|
||||
"last_prompt_tokens": self.last_prompt_tokens,
|
||||
"threshold_tokens": self.threshold_tokens,
|
||||
"context_length": self.context_length,
|
||||
"usage_percent": (
|
||||
min(100, self.last_prompt_tokens / self.context_length * 100)
|
||||
if self.context_length else 0
|
||||
),
|
||||
"compression_count": self.compression_count,
|
||||
}
|
||||
|
||||
# -- Optional: model switch support ------------------------------------
|
||||
|
||||
def update_model(
|
||||
self,
|
||||
model: str,
|
||||
context_length: int,
|
||||
base_url: str = "",
|
||||
api_key: str = "",
|
||||
provider: str = "",
|
||||
) -> None:
|
||||
"""Called when the user switches models or on fallback activation.
|
||||
|
||||
Default updates context_length and recalculates threshold_tokens
|
||||
from threshold_percent. Override if your engine needs more
|
||||
(e.g. recalculate DAG budgets, switch summary models).
|
||||
"""
|
||||
self.context_length = context_length
|
||||
self.threshold_tokens = int(context_length * self.threshold_percent)
|
||||
@@ -437,6 +437,16 @@ DEFAULT_CONFIG = {
|
||||
"max_ms": 2500,
|
||||
},
|
||||
|
||||
# Context engine -- controls how the context window is managed when
|
||||
# approaching the model's token limit.
|
||||
# "compressor" = built-in lossy summarization (default).
|
||||
# Set to a plugin name to activate an alternative engine (e.g. "lcm"
|
||||
# for Lossless Context Management). The engine must be installed as
|
||||
# a plugin in plugins/context_engine/<name>/ or ~/.hermes/plugins/.
|
||||
"context": {
|
||||
"engine": "compressor",
|
||||
},
|
||||
|
||||
# Persistent memory -- bounded curated memory injected into system prompt
|
||||
"memory": {
|
||||
"memory_enabled": True,
|
||||
@@ -1338,7 +1348,7 @@ _KNOWN_ROOT_KEYS = {
|
||||
"_config_version", "model", "providers", "fallback_model",
|
||||
"fallback_providers", "credential_pool_strategies", "toolsets",
|
||||
"agent", "terminal", "display", "compression", "delegation",
|
||||
"auxiliary", "custom_providers", "memory", "gateway",
|
||||
"auxiliary", "custom_providers", "context", "memory", "gateway",
|
||||
}
|
||||
|
||||
# Valid fields inside a custom_providers list entry
|
||||
|
||||
@@ -199,8 +199,7 @@ class PluginContext:
|
||||
|
||||
The *setup_fn* receives an argparse subparser and should add any
|
||||
arguments/sub-subparsers. If *handler_fn* is provided it is set
|
||||
as the default dispatch function via ``set_defaults(func=...)``.
|
||||
"""
|
||||
as the default dispatch function via ``set_defaults(func=...)``."""
|
||||
self._manager._cli_commands[name] = {
|
||||
"name": name,
|
||||
"help": help,
|
||||
@@ -211,6 +210,38 @@ class PluginContext:
|
||||
}
|
||||
logger.debug("Plugin %s registered CLI command: %s", self.manifest.name, name)
|
||||
|
||||
# -- context engine registration -----------------------------------------
|
||||
|
||||
def register_context_engine(self, engine) -> None:
|
||||
"""Register a context engine to replace the built-in ContextCompressor.
|
||||
|
||||
Only one context engine plugin is allowed. If a second plugin tries
|
||||
to register one, it is rejected with a warning.
|
||||
|
||||
The engine must be an instance of ``agent.context_engine.ContextEngine``.
|
||||
"""
|
||||
if self._manager._context_engine is not None:
|
||||
logger.warning(
|
||||
"Plugin '%s' tried to register a context engine, but one is "
|
||||
"already registered. Only one context engine plugin is allowed.",
|
||||
self.manifest.name,
|
||||
)
|
||||
return
|
||||
# Defer the import to avoid circular deps at module level
|
||||
from agent.context_engine import ContextEngine
|
||||
if not isinstance(engine, ContextEngine):
|
||||
logger.warning(
|
||||
"Plugin '%s' tried to register a context engine that does not "
|
||||
"inherit from ContextEngine. Ignoring.",
|
||||
self.manifest.name,
|
||||
)
|
||||
return
|
||||
self._manager._context_engine = engine
|
||||
logger.info(
|
||||
"Plugin '%s' registered context engine: %s",
|
||||
self.manifest.name, engine.name,
|
||||
)
|
||||
|
||||
# -- hook registration --------------------------------------------------
|
||||
|
||||
def register_hook(self, hook_name: str, callback: Callable) -> None:
|
||||
@@ -243,6 +274,7 @@ class PluginManager:
|
||||
self._hooks: Dict[str, List[Callable]] = {}
|
||||
self._plugin_tool_names: Set[str] = set()
|
||||
self._cli_commands: Dict[str, dict] = {}
|
||||
self._context_engine = None # Set by a plugin via register_context_engine()
|
||||
self._discovered: bool = False
|
||||
self._cli_ref = None # Set by CLI after plugin discovery
|
||||
|
||||
@@ -564,6 +596,11 @@ def get_plugin_cli_commands() -> Dict[str, dict]:
|
||||
return dict(get_plugin_manager()._cli_commands)
|
||||
|
||||
|
||||
def get_plugin_context_engine():
|
||||
"""Return the plugin-registered context engine, or None."""
|
||||
return get_plugin_manager()._context_engine
|
||||
|
||||
|
||||
def get_plugin_toolsets() -> List[tuple]:
|
||||
"""Return plugin toolsets as ``(key, label, description)`` tuples.
|
||||
|
||||
|
||||
219
plugins/context_engine/__init__.py
Normal file
219
plugins/context_engine/__init__.py
Normal file
@@ -0,0 +1,219 @@
|
||||
"""Context engine plugin discovery.
|
||||
|
||||
Scans ``plugins/context_engine/<name>/`` directories for context engine
|
||||
plugins. Each subdirectory must contain ``__init__.py`` with a class
|
||||
implementing the ContextEngine ABC.
|
||||
|
||||
Context engines are separate from the general plugin system — they live
|
||||
in the repo and are always available without user installation. Only ONE
|
||||
can be active at a time, selected via ``context.engine`` in config.yaml.
|
||||
The default engine is ``"compressor"`` (the built-in ContextCompressor).
|
||||
|
||||
Usage:
|
||||
from plugins.context_engine import discover_context_engines, load_context_engine
|
||||
|
||||
available = discover_context_engines() # [(name, desc, available), ...]
|
||||
engine = load_context_engine("lcm") # ContextEngine instance
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import importlib.util
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_CONTEXT_ENGINE_PLUGINS_DIR = Path(__file__).parent
|
||||
|
||||
|
||||
def discover_context_engines() -> List[Tuple[str, str, bool]]:
|
||||
"""Scan plugins/context_engine/ for available engines.
|
||||
|
||||
Returns list of (name, description, is_available) tuples.
|
||||
Does NOT import the engines — just reads plugin.yaml for metadata
|
||||
and does a lightweight availability check.
|
||||
"""
|
||||
results = []
|
||||
if not _CONTEXT_ENGINE_PLUGINS_DIR.is_dir():
|
||||
return results
|
||||
|
||||
for child in sorted(_CONTEXT_ENGINE_PLUGINS_DIR.iterdir()):
|
||||
if not child.is_dir() or child.name.startswith(("_", ".")):
|
||||
continue
|
||||
init_file = child / "__init__.py"
|
||||
if not init_file.exists():
|
||||
continue
|
||||
|
||||
# Read description from plugin.yaml if available
|
||||
desc = ""
|
||||
yaml_file = child / "plugin.yaml"
|
||||
if yaml_file.exists():
|
||||
try:
|
||||
import yaml
|
||||
with open(yaml_file) as f:
|
||||
meta = yaml.safe_load(f) or {}
|
||||
desc = meta.get("description", "")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Quick availability check — try loading and calling is_available()
|
||||
available = True
|
||||
try:
|
||||
engine = _load_engine_from_dir(child)
|
||||
if engine is None:
|
||||
available = False
|
||||
elif hasattr(engine, "is_available"):
|
||||
available = engine.is_available()
|
||||
except Exception:
|
||||
available = False
|
||||
|
||||
results.append((child.name, desc, available))
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def load_context_engine(name: str) -> Optional["ContextEngine"]:
|
||||
"""Load and return a ContextEngine instance by name.
|
||||
|
||||
Returns None if the engine is not found or fails to load.
|
||||
"""
|
||||
engine_dir = _CONTEXT_ENGINE_PLUGINS_DIR / name
|
||||
if not engine_dir.is_dir():
|
||||
logger.debug("Context engine '%s' not found in %s", name, _CONTEXT_ENGINE_PLUGINS_DIR)
|
||||
return None
|
||||
|
||||
try:
|
||||
engine = _load_engine_from_dir(engine_dir)
|
||||
if engine:
|
||||
return engine
|
||||
logger.warning("Context engine '%s' loaded but no engine instance found", name)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning("Failed to load context engine '%s': %s", name, e)
|
||||
return None
|
||||
|
||||
|
||||
def _load_engine_from_dir(engine_dir: Path) -> Optional["ContextEngine"]:
|
||||
"""Import an engine module and extract the ContextEngine instance.
|
||||
|
||||
The module must have either:
|
||||
- A register(ctx) function (plugin-style) — we simulate a ctx
|
||||
- A top-level class that extends ContextEngine — we instantiate it
|
||||
"""
|
||||
name = engine_dir.name
|
||||
module_name = f"plugins.context_engine.{name}"
|
||||
init_file = engine_dir / "__init__.py"
|
||||
|
||||
if not init_file.exists():
|
||||
return None
|
||||
|
||||
# Check if already loaded
|
||||
if module_name in sys.modules:
|
||||
mod = sys.modules[module_name]
|
||||
else:
|
||||
# Handle relative imports within the plugin
|
||||
# First ensure the parent packages are registered
|
||||
for parent in ("plugins", "plugins.context_engine"):
|
||||
if parent not in sys.modules:
|
||||
parent_path = Path(__file__).parent
|
||||
if parent == "plugins":
|
||||
parent_path = parent_path.parent
|
||||
parent_init = parent_path / "__init__.py"
|
||||
if parent_init.exists():
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
parent, str(parent_init),
|
||||
submodule_search_locations=[str(parent_path)]
|
||||
)
|
||||
if spec:
|
||||
parent_mod = importlib.util.module_from_spec(spec)
|
||||
sys.modules[parent] = parent_mod
|
||||
try:
|
||||
spec.loader.exec_module(parent_mod)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Now load the engine module
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
module_name, str(init_file),
|
||||
submodule_search_locations=[str(engine_dir)]
|
||||
)
|
||||
if not spec:
|
||||
return None
|
||||
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = mod
|
||||
|
||||
# Register submodules so relative imports work
|
||||
for sub_file in engine_dir.glob("*.py"):
|
||||
if sub_file.name == "__init__.py":
|
||||
continue
|
||||
sub_name = sub_file.stem
|
||||
full_sub_name = f"{module_name}.{sub_name}"
|
||||
if full_sub_name not in sys.modules:
|
||||
sub_spec = importlib.util.spec_from_file_location(
|
||||
full_sub_name, str(sub_file)
|
||||
)
|
||||
if sub_spec:
|
||||
sub_mod = importlib.util.module_from_spec(sub_spec)
|
||||
sys.modules[full_sub_name] = sub_mod
|
||||
try:
|
||||
sub_spec.loader.exec_module(sub_mod)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to load submodule %s: %s", full_sub_name, e)
|
||||
|
||||
try:
|
||||
spec.loader.exec_module(mod)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to exec_module %s: %s", module_name, e)
|
||||
sys.modules.pop(module_name, None)
|
||||
return None
|
||||
|
||||
# Try register(ctx) pattern first (how plugins are written)
|
||||
if hasattr(mod, "register"):
|
||||
collector = _EngineCollector()
|
||||
try:
|
||||
mod.register(collector)
|
||||
if collector.engine:
|
||||
return collector.engine
|
||||
except Exception as e:
|
||||
logger.debug("register() failed for %s: %s", name, e)
|
||||
|
||||
# Fallback: find a ContextEngine subclass and instantiate it
|
||||
from agent.context_engine import ContextEngine
|
||||
for attr_name in dir(mod):
|
||||
attr = getattr(mod, attr_name, None)
|
||||
if (isinstance(attr, type) and issubclass(attr, ContextEngine)
|
||||
and attr is not ContextEngine):
|
||||
try:
|
||||
return attr()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class _EngineCollector:
|
||||
"""Fake plugin context that captures register_context_engine calls."""
|
||||
|
||||
def __init__(self):
|
||||
self.engine = None
|
||||
|
||||
def register_context_engine(self, engine):
|
||||
self.engine = engine
|
||||
|
||||
# No-op for other registration methods
|
||||
def register_tool(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def register_hook(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def register_cli_command(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def register_memory_provider(self, *args, **kwargs):
|
||||
pass
|
||||
259
run_agent.py
259
run_agent.py
@@ -1129,20 +1129,94 @@ class AIAgent:
|
||||
pass
|
||||
break
|
||||
|
||||
self.context_compressor = ContextCompressor(
|
||||
model=self.model,
|
||||
threshold_percent=compression_threshold,
|
||||
protect_first_n=3,
|
||||
protect_last_n=compression_protect_last,
|
||||
summary_target_ratio=compression_target_ratio,
|
||||
summary_model_override=compression_summary_model,
|
||||
quiet_mode=self.quiet_mode,
|
||||
base_url=self.base_url,
|
||||
api_key=getattr(self, "api_key", ""),
|
||||
config_context_length=_config_context_length,
|
||||
provider=self.provider,
|
||||
)
|
||||
# Select context engine: config-driven (like memory providers).
|
||||
# 1. Check config.yaml context.engine setting
|
||||
# 2. Check plugins/context_engine/<name>/ directory (repo-shipped)
|
||||
# 3. Check general plugin system (user-installed plugins)
|
||||
# 4. Fall back to built-in ContextCompressor
|
||||
_selected_engine = None
|
||||
_engine_name = "compressor" # default
|
||||
try:
|
||||
_ctx_cfg = _agent_cfg.get("context", {}) if isinstance(_agent_cfg, dict) else {}
|
||||
_engine_name = _ctx_cfg.get("engine", "compressor") or "compressor"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if _engine_name != "compressor":
|
||||
# Try loading from plugins/context_engine/<name>/
|
||||
try:
|
||||
from plugins.context_engine import load_context_engine
|
||||
_selected_engine = load_context_engine(_engine_name)
|
||||
except Exception as _ce_load_err:
|
||||
logger.debug("Context engine load from plugins/context_engine/: %s", _ce_load_err)
|
||||
|
||||
# Try general plugin system as fallback
|
||||
if _selected_engine is None:
|
||||
try:
|
||||
from hermes_cli.plugins import get_plugin_context_engine
|
||||
_candidate = get_plugin_context_engine()
|
||||
if _candidate and _candidate.name == _engine_name:
|
||||
_selected_engine = _candidate
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if _selected_engine is None:
|
||||
logger.warning(
|
||||
"Context engine '%s' not found — falling back to built-in compressor",
|
||||
_engine_name,
|
||||
)
|
||||
else:
|
||||
# Even with default config, check if a plugin registered one
|
||||
try:
|
||||
from hermes_cli.plugins import get_plugin_context_engine
|
||||
_selected_engine = get_plugin_context_engine()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if _selected_engine is not None:
|
||||
self.context_compressor = _selected_engine
|
||||
if not self.quiet_mode:
|
||||
logger.info("Using context engine: %s", _selected_engine.name)
|
||||
else:
|
||||
self.context_compressor = ContextCompressor(
|
||||
model=self.model,
|
||||
threshold_percent=compression_threshold,
|
||||
protect_first_n=3,
|
||||
protect_last_n=compression_protect_last,
|
||||
summary_target_ratio=compression_target_ratio,
|
||||
summary_model_override=compression_summary_model,
|
||||
quiet_mode=self.quiet_mode,
|
||||
base_url=self.base_url,
|
||||
api_key=getattr(self, "api_key", ""),
|
||||
config_context_length=_config_context_length,
|
||||
provider=self.provider,
|
||||
)
|
||||
self.compression_enabled = compression_enabled
|
||||
|
||||
# Inject context engine tool schemas (e.g. lcm_grep, lcm_describe, lcm_expand)
|
||||
self._context_engine_tool_names: set = set()
|
||||
if hasattr(self, "context_compressor") and self.context_compressor and self.tools is not None:
|
||||
for _schema in self.context_compressor.get_tool_schemas():
|
||||
_wrapped = {"type": "function", "function": _schema}
|
||||
self.tools.append(_wrapped)
|
||||
_tname = _schema.get("name", "")
|
||||
if _tname:
|
||||
self.valid_tool_names.add(_tname)
|
||||
self._context_engine_tool_names.add(_tname)
|
||||
|
||||
# Notify context engine of session start
|
||||
if hasattr(self, "context_compressor") and self.context_compressor:
|
||||
try:
|
||||
self.context_compressor.on_session_start(
|
||||
self.session_id,
|
||||
hermes_home=str(get_hermes_home()),
|
||||
platform=self.platform or "cli",
|
||||
model=self.model,
|
||||
context_length=getattr(self.context_compressor, "context_length", 0),
|
||||
)
|
||||
except Exception as _ce_err:
|
||||
logger.debug("Context engine on_session_start: %s", _ce_err)
|
||||
|
||||
self._subdirectory_hints = SubdirectoryHintTracker(
|
||||
working_dir=os.getenv("TERMINAL_CWD") or None,
|
||||
)
|
||||
@@ -1208,11 +1282,13 @@ class AIAgent:
|
||||
"api_key": getattr(self, "api_key", ""),
|
||||
"client_kwargs": dict(self._client_kwargs),
|
||||
"use_prompt_caching": self._use_prompt_caching,
|
||||
# Compressor state that _try_activate_fallback() overwrites
|
||||
"compressor_model": _cc.model,
|
||||
"compressor_base_url": _cc.base_url,
|
||||
# Context engine state that _try_activate_fallback() overwrites.
|
||||
# Use getattr for model/base_url/api_key/provider since plugin
|
||||
# engines may not have these (they're ContextCompressor-specific).
|
||||
"compressor_model": getattr(_cc, "model", self.model),
|
||||
"compressor_base_url": getattr(_cc, "base_url", self.base_url),
|
||||
"compressor_api_key": getattr(_cc, "api_key", ""),
|
||||
"compressor_provider": _cc.provider,
|
||||
"compressor_provider": getattr(_cc, "provider", self.provider),
|
||||
"compressor_context_length": _cc.context_length,
|
||||
"compressor_threshold_tokens": _cc.threshold_tokens,
|
||||
}
|
||||
@@ -1258,16 +1334,9 @@ class AIAgent:
|
||||
# Turn counter (added after reset_session_state was first written — #2635)
|
||||
self._user_turn_count = 0
|
||||
|
||||
# Context compressor internal counters (if present)
|
||||
# Context engine reset (works for both built-in compressor and plugins)
|
||||
if hasattr(self, "context_compressor") and self.context_compressor:
|
||||
self.context_compressor.last_prompt_tokens = 0
|
||||
self.context_compressor.last_completion_tokens = 0
|
||||
self.context_compressor.last_total_tokens = 0
|
||||
self.context_compressor.compression_count = 0
|
||||
self.context_compressor._context_probed = False
|
||||
self.context_compressor._context_probe_persistable = False
|
||||
# Iterative summary from previous session must not bleed into new one (#2635)
|
||||
self.context_compressor._previous_summary = None
|
||||
self.context_compressor.on_session_reset()
|
||||
|
||||
def switch_model(self, new_model, new_provider, api_key='', base_url='', api_mode=''):
|
||||
"""Switch the model/provider in-place for a live agent.
|
||||
@@ -1347,13 +1416,12 @@ class AIAgent:
|
||||
api_key=self.api_key,
|
||||
provider=self.provider,
|
||||
)
|
||||
self.context_compressor.model = self.model
|
||||
self.context_compressor.base_url = self.base_url
|
||||
self.context_compressor.api_key = self.api_key
|
||||
self.context_compressor.provider = self.provider
|
||||
self.context_compressor.context_length = new_context_length
|
||||
self.context_compressor.threshold_tokens = int(
|
||||
new_context_length * self.context_compressor.threshold_percent
|
||||
self.context_compressor.update_model(
|
||||
model=self.model,
|
||||
context_length=new_context_length,
|
||||
base_url=self.base_url,
|
||||
api_key=getattr(self, "api_key", ""),
|
||||
provider=self.provider,
|
||||
)
|
||||
|
||||
# ── Invalidate cached system prompt so it rebuilds next turn ──
|
||||
@@ -1369,10 +1437,10 @@ class AIAgent:
|
||||
"api_key": getattr(self, "api_key", ""),
|
||||
"client_kwargs": dict(self._client_kwargs),
|
||||
"use_prompt_caching": self._use_prompt_caching,
|
||||
"compressor_model": _cc.model if _cc else self.model,
|
||||
"compressor_base_url": _cc.base_url if _cc else self.base_url,
|
||||
"compressor_model": getattr(_cc, "model", self.model) if _cc else self.model,
|
||||
"compressor_base_url": getattr(_cc, "base_url", self.base_url) if _cc else self.base_url,
|
||||
"compressor_api_key": getattr(_cc, "api_key", "") if _cc else "",
|
||||
"compressor_provider": _cc.provider if _cc else self.provider,
|
||||
"compressor_provider": getattr(_cc, "provider", self.provider) if _cc else self.provider,
|
||||
"compressor_context_length": _cc.context_length if _cc else 0,
|
||||
"compressor_threshold_tokens": _cc.threshold_tokens if _cc else 0,
|
||||
}
|
||||
@@ -2517,10 +2585,11 @@ class AIAgent:
|
||||
}
|
||||
|
||||
def shutdown_memory_provider(self, messages: list = None) -> None:
|
||||
"""Shut down the memory provider — call at actual session boundaries.
|
||||
"""Shut down the memory provider and context engine — call at actual session boundaries.
|
||||
|
||||
This calls on_session_end() then shutdown_all() on the memory
|
||||
manager. NOT called per-turn — only at CLI exit, /reset, gateway
|
||||
manager, and on_session_end() on the context engine.
|
||||
NOT called per-turn — only at CLI exit, /reset, gateway
|
||||
session expiry, etc.
|
||||
"""
|
||||
if self._memory_manager:
|
||||
@@ -2532,6 +2601,15 @@ class AIAgent:
|
||||
self._memory_manager.shutdown_all()
|
||||
except Exception:
|
||||
pass
|
||||
# Notify context engine of session end (flush DAG, close DBs, etc.)
|
||||
if hasattr(self, "context_compressor") and self.context_compressor:
|
||||
try:
|
||||
self.context_compressor.on_session_end(
|
||||
self.session_id or "",
|
||||
messages or [],
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _hydrate_todo_store(self, history: List[Dict[str, Any]]) -> None:
|
||||
"""
|
||||
@@ -4901,13 +4979,12 @@ class AIAgent:
|
||||
self.model, base_url=self.base_url,
|
||||
api_key=self.api_key, provider=self.provider,
|
||||
)
|
||||
self.context_compressor.model = self.model
|
||||
self.context_compressor.base_url = self.base_url
|
||||
self.context_compressor.api_key = self.api_key
|
||||
self.context_compressor.provider = self.provider
|
||||
self.context_compressor.context_length = fb_context_length
|
||||
self.context_compressor.threshold_tokens = int(
|
||||
fb_context_length * self.context_compressor.threshold_percent
|
||||
self.context_compressor.update_model(
|
||||
model=self.model,
|
||||
context_length=fb_context_length,
|
||||
base_url=self.base_url,
|
||||
api_key=getattr(self, "api_key", ""),
|
||||
provider=self.provider,
|
||||
)
|
||||
|
||||
self._emit_status(
|
||||
@@ -4967,14 +5044,15 @@ class AIAgent:
|
||||
shared=True,
|
||||
)
|
||||
|
||||
# ── Restore context compressor state ──
|
||||
# ── Restore context engine state ──
|
||||
cc = self.context_compressor
|
||||
cc.model = rt["compressor_model"]
|
||||
cc.base_url = rt["compressor_base_url"]
|
||||
cc.api_key = rt["compressor_api_key"]
|
||||
cc.provider = rt["compressor_provider"]
|
||||
cc.context_length = rt["compressor_context_length"]
|
||||
cc.threshold_tokens = rt["compressor_threshold_tokens"]
|
||||
cc.update_model(
|
||||
model=rt["compressor_model"],
|
||||
context_length=rt["compressor_context_length"],
|
||||
base_url=rt["compressor_base_url"],
|
||||
api_key=rt["compressor_api_key"],
|
||||
provider=rt["compressor_provider"],
|
||||
)
|
||||
|
||||
# ── Reset fallback chain for the new turn ──
|
||||
self._fallback_activated = False
|
||||
@@ -6416,6 +6494,29 @@ class AIAgent:
|
||||
spinner.stop(cute_msg)
|
||||
elif self.quiet_mode:
|
||||
self._vprint(f" {cute_msg}")
|
||||
elif self._context_engine_tool_names and function_name in self._context_engine_tool_names:
|
||||
# Context engine tools (lcm_grep, lcm_describe, lcm_expand, etc.)
|
||||
spinner = None
|
||||
if self.quiet_mode and not self.tool_progress_callback:
|
||||
face = random.choice(KawaiiSpinner.KAWAII_WAITING)
|
||||
emoji = _get_tool_emoji(function_name)
|
||||
preview = _build_tool_preview(function_name, function_args) or function_name
|
||||
spinner = KawaiiSpinner(f"{face} {emoji} {preview}", spinner_type='dots', print_fn=self._print_fn)
|
||||
spinner.start()
|
||||
_ce_result = None
|
||||
try:
|
||||
function_result = self.context_compressor.handle_tool_call(function_name, function_args, messages=messages)
|
||||
_ce_result = function_result
|
||||
except Exception as tool_error:
|
||||
function_result = json.dumps({"error": f"Context engine tool '{function_name}' failed: {tool_error}"})
|
||||
logger.error("context_engine.handle_tool_call raised for %s: %s", function_name, tool_error, exc_info=True)
|
||||
finally:
|
||||
tool_duration = time.time() - tool_start_time
|
||||
cute_msg = _get_cute_tool_message_impl(function_name, function_args, tool_duration, result=_ce_result)
|
||||
if spinner:
|
||||
spinner.stop(cute_msg)
|
||||
elif self.quiet_mode:
|
||||
self._vprint(f" {cute_msg}")
|
||||
elif self._memory_manager and self._memory_manager.has_tool(function_name):
|
||||
# Memory provider tools (hindsight_retain, honcho_search, etc.)
|
||||
# These are not in the tool registry — route through MemoryManager.
|
||||
@@ -7693,7 +7794,7 @@ class AIAgent:
|
||||
# Cache discovered context length after successful call.
|
||||
# Only persist limits confirmed by the provider (parsed
|
||||
# from the error message), not guessed probe tiers.
|
||||
if self.context_compressor._context_probed:
|
||||
if getattr(self.context_compressor, "_context_probed", False):
|
||||
ctx = self.context_compressor.context_length
|
||||
if getattr(self.context_compressor, "_context_probe_persistable", False):
|
||||
save_context_length(self.model, self.base_url, ctx)
|
||||
@@ -7978,16 +8079,22 @@ class AIAgent:
|
||||
compressor = self.context_compressor
|
||||
old_ctx = compressor.context_length
|
||||
if old_ctx > _reduced_ctx:
|
||||
compressor.context_length = _reduced_ctx
|
||||
compressor.threshold_tokens = int(
|
||||
_reduced_ctx * compressor.threshold_percent
|
||||
compressor.update_model(
|
||||
model=self.model,
|
||||
context_length=_reduced_ctx,
|
||||
base_url=self.base_url,
|
||||
api_key=getattr(self, "api_key", ""),
|
||||
provider=self.provider,
|
||||
)
|
||||
compressor._context_probed = True
|
||||
# Don't persist — this is a subscription-tier
|
||||
# limitation, not a model capability. If the user
|
||||
# later enables extra usage the 1M limit should
|
||||
# come back automatically.
|
||||
compressor._context_probe_persistable = False
|
||||
# Context probing flags — only set on built-in
|
||||
# compressor (plugin engines manage their own).
|
||||
if hasattr(compressor, "_context_probed"):
|
||||
compressor._context_probed = True
|
||||
# Don't persist — this is a subscription-tier
|
||||
# limitation, not a model capability. If the
|
||||
# user later enables extra usage the 1M limit
|
||||
# should come back automatically.
|
||||
compressor._context_probe_persistable = False
|
||||
self._vprint(
|
||||
f"{self.log_prefix}⚠️ Anthropic long-context tier "
|
||||
f"requires extra usage — reducing context: "
|
||||
@@ -8160,17 +8267,25 @@ class AIAgent:
|
||||
new_ctx = get_next_probe_tier(old_ctx)
|
||||
|
||||
if new_ctx and new_ctx < old_ctx:
|
||||
compressor.context_length = new_ctx
|
||||
compressor.threshold_tokens = int(new_ctx * compressor.threshold_percent)
|
||||
compressor._context_probed = True
|
||||
# Only persist limits parsed from the provider's
|
||||
# error message (a real number). Guessed fallback
|
||||
# tiers from get_next_probe_tier() should stay
|
||||
# in-memory only — persisting them pollutes the
|
||||
# cache with wrong values.
|
||||
compressor._context_probe_persistable = bool(
|
||||
parsed_limit and parsed_limit == new_ctx
|
||||
compressor.update_model(
|
||||
model=self.model,
|
||||
context_length=new_ctx,
|
||||
base_url=self.base_url,
|
||||
api_key=getattr(self, "api_key", ""),
|
||||
provider=self.provider,
|
||||
)
|
||||
# Context probing flags — only set on built-in
|
||||
# compressor (plugin engines manage their own).
|
||||
if hasattr(compressor, "_context_probed"):
|
||||
compressor._context_probed = True
|
||||
# Only persist limits parsed from the provider's
|
||||
# error message (a real number). Guessed fallback
|
||||
# tiers from get_next_probe_tier() should stay
|
||||
# in-memory only — persisting them pollutes the
|
||||
# cache with wrong values.
|
||||
compressor._context_probe_persistable = bool(
|
||||
parsed_limit and parsed_limit == new_ctx
|
||||
)
|
||||
self._vprint(f"{self.log_prefix}⚠️ Context length exceeded — stepping down: {old_ctx:,} → {new_ctx:,} tokens", force=True)
|
||||
else:
|
||||
self._vprint(f"{self.log_prefix}⚠️ Context length exceeded at minimum tier — attempting compression...", force=True)
|
||||
|
||||
250
tests/agent/test_context_engine.py
Normal file
250
tests/agent/test_context_engine.py
Normal file
@@ -0,0 +1,250 @@
|
||||
"""Tests for the ContextEngine ABC and plugin slot."""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from agent.context_engine import ContextEngine
|
||||
from agent.context_compressor import ContextCompressor
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# A minimal concrete engine for testing the ABC
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class StubEngine(ContextEngine):
|
||||
"""Minimal engine that satisfies the ABC without doing real work."""
|
||||
|
||||
def __init__(self, context_length=200000, threshold_pct=0.50):
|
||||
self.context_length = context_length
|
||||
self.threshold_tokens = int(context_length * threshold_pct)
|
||||
self._compress_called = False
|
||||
self._tools_called = []
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "stub"
|
||||
|
||||
def update_from_response(self, usage: Dict[str, Any]) -> None:
|
||||
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
|
||||
self.last_completion_tokens = usage.get("completion_tokens", 0)
|
||||
self.last_total_tokens = usage.get("total_tokens", 0)
|
||||
|
||||
def should_compress(self, prompt_tokens: int = None) -> bool:
|
||||
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
|
||||
return tokens >= self.threshold_tokens
|
||||
|
||||
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
|
||||
self._compress_called = True
|
||||
self.compression_count += 1
|
||||
# Trivial: just return as-is
|
||||
return messages
|
||||
|
||||
def get_tool_schemas(self) -> List[Dict[str, Any]]:
|
||||
return [
|
||||
{
|
||||
"name": "stub_search",
|
||||
"description": "Search the stub engine",
|
||||
"parameters": {"type": "object", "properties": {}},
|
||||
}
|
||||
]
|
||||
|
||||
def handle_tool_call(self, name: str, args: Dict[str, Any]) -> str:
|
||||
self._tools_called.append(name)
|
||||
return json.dumps({"ok": True, "tool": name})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ABC contract tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestContextEngineABC:
|
||||
"""Verify the ABC enforces the required interface."""
|
||||
|
||||
def test_cannot_instantiate_abc_directly(self):
|
||||
with pytest.raises(TypeError):
|
||||
ContextEngine()
|
||||
|
||||
def test_missing_methods_raises(self):
|
||||
"""A subclass missing required methods cannot be instantiated."""
|
||||
class Incomplete(ContextEngine):
|
||||
@property
|
||||
def name(self):
|
||||
return "incomplete"
|
||||
with pytest.raises(TypeError):
|
||||
Incomplete()
|
||||
|
||||
def test_stub_engine_satisfies_abc(self):
|
||||
engine = StubEngine()
|
||||
assert isinstance(engine, ContextEngine)
|
||||
assert engine.name == "stub"
|
||||
|
||||
def test_compressor_is_context_engine(self):
|
||||
c = ContextCompressor(model="test", quiet_mode=True, config_context_length=200000)
|
||||
assert isinstance(c, ContextEngine)
|
||||
assert c.name == "compressor"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Default method behavior
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDefaults:
|
||||
"""Verify ABC default implementations work correctly."""
|
||||
|
||||
def test_default_tool_schemas_empty(self):
|
||||
engine = StubEngine()
|
||||
# StubEngine overrides this, so test the base via super
|
||||
assert ContextEngine.get_tool_schemas(engine) == []
|
||||
|
||||
def test_default_handle_tool_call_returns_error(self):
|
||||
engine = StubEngine()
|
||||
result = ContextEngine.handle_tool_call(engine, "unknown", {})
|
||||
data = json.loads(result)
|
||||
assert "error" in data
|
||||
|
||||
def test_default_get_status(self):
|
||||
engine = StubEngine()
|
||||
engine.last_prompt_tokens = 50000
|
||||
status = engine.get_status()
|
||||
assert status["last_prompt_tokens"] == 50000
|
||||
assert status["context_length"] == 200000
|
||||
assert status["threshold_tokens"] == 100000
|
||||
assert 0 < status["usage_percent"] <= 100
|
||||
|
||||
def test_on_session_reset(self):
|
||||
engine = StubEngine()
|
||||
engine.last_prompt_tokens = 999
|
||||
engine.compression_count = 3
|
||||
engine.on_session_reset()
|
||||
assert engine.last_prompt_tokens == 0
|
||||
assert engine.compression_count == 0
|
||||
|
||||
def test_should_compress_preflight_default_false(self):
|
||||
engine = StubEngine()
|
||||
assert engine.should_compress_preflight([]) is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# StubEngine behavior
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestStubEngine:
|
||||
|
||||
def test_should_compress(self):
|
||||
engine = StubEngine(context_length=100000, threshold_pct=0.50)
|
||||
assert not engine.should_compress(40000)
|
||||
assert engine.should_compress(50000)
|
||||
assert engine.should_compress(60000)
|
||||
|
||||
def test_compress_tracks_count(self):
|
||||
engine = StubEngine()
|
||||
msgs = [{"role": "user", "content": "hello"}]
|
||||
result = engine.compress(msgs)
|
||||
assert result == msgs
|
||||
assert engine._compress_called
|
||||
assert engine.compression_count == 1
|
||||
|
||||
def test_tool_schemas(self):
|
||||
engine = StubEngine()
|
||||
schemas = engine.get_tool_schemas()
|
||||
assert len(schemas) == 1
|
||||
assert schemas[0]["name"] == "stub_search"
|
||||
|
||||
def test_handle_tool_call(self):
|
||||
engine = StubEngine()
|
||||
result = engine.handle_tool_call("stub_search", {})
|
||||
assert json.loads(result)["ok"] is True
|
||||
assert "stub_search" in engine._tools_called
|
||||
|
||||
def test_update_from_response(self):
|
||||
engine = StubEngine()
|
||||
engine.update_from_response({"prompt_tokens": 1000, "completion_tokens": 200, "total_tokens": 1200})
|
||||
assert engine.last_prompt_tokens == 1000
|
||||
assert engine.last_completion_tokens == 200
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ContextCompressor session reset via ABC
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCompressorSessionReset:
|
||||
"""Verify ContextCompressor.on_session_reset() clears all state."""
|
||||
|
||||
def test_reset_clears_state(self):
|
||||
c = ContextCompressor(model="test", quiet_mode=True, config_context_length=200000)
|
||||
c.last_prompt_tokens = 50000
|
||||
c.compression_count = 3
|
||||
c._previous_summary = "some old summary"
|
||||
c._context_probed = True
|
||||
c._context_probe_persistable = True
|
||||
|
||||
c.on_session_reset()
|
||||
|
||||
assert c.last_prompt_tokens == 0
|
||||
assert c.last_completion_tokens == 0
|
||||
assert c.last_total_tokens == 0
|
||||
assert c.compression_count == 0
|
||||
assert c._context_probed is False
|
||||
assert c._context_probe_persistable is False
|
||||
assert c._previous_summary is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Plugin slot (PluginManager integration)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPluginContextEngineSlot:
|
||||
"""Test register_context_engine on PluginContext."""
|
||||
|
||||
def test_register_engine(self):
|
||||
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
|
||||
mgr = PluginManager()
|
||||
manifest = PluginManifest(name="test-lcm")
|
||||
ctx = PluginContext(manifest, mgr)
|
||||
|
||||
engine = StubEngine()
|
||||
ctx.register_context_engine(engine)
|
||||
|
||||
assert mgr._context_engine is engine
|
||||
assert mgr._context_engine.name == "stub"
|
||||
|
||||
def test_reject_second_engine(self):
|
||||
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
|
||||
mgr = PluginManager()
|
||||
manifest = PluginManifest(name="test-lcm")
|
||||
ctx = PluginContext(manifest, mgr)
|
||||
|
||||
engine1 = StubEngine()
|
||||
engine2 = StubEngine()
|
||||
ctx.register_context_engine(engine1)
|
||||
ctx.register_context_engine(engine2) # should be rejected
|
||||
|
||||
assert mgr._context_engine is engine1
|
||||
|
||||
def test_reject_non_engine(self):
|
||||
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest
|
||||
mgr = PluginManager()
|
||||
manifest = PluginManifest(name="test-bad")
|
||||
ctx = PluginContext(manifest, mgr)
|
||||
|
||||
ctx.register_context_engine("not an engine")
|
||||
assert mgr._context_engine is None
|
||||
|
||||
def test_get_plugin_context_engine(self):
|
||||
from hermes_cli.plugins import PluginManager, PluginContext, PluginManifest, get_plugin_context_engine, _plugin_manager
|
||||
import hermes_cli.plugins as plugins_mod
|
||||
|
||||
# Inject a test manager
|
||||
old_mgr = plugins_mod._plugin_manager
|
||||
try:
|
||||
mgr = PluginManager()
|
||||
plugins_mod._plugin_manager = mgr
|
||||
|
||||
assert get_plugin_context_engine() is None
|
||||
|
||||
engine = StubEngine()
|
||||
mgr._context_engine = engine
|
||||
assert get_plugin_context_engine() is engine
|
||||
finally:
|
||||
plugins_mod._plugin_manager = old_mgr
|
||||
Reference in New Issue
Block a user