From ea01bdcebe1fd2e297923626de74dbe529c47bf4 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sat, 25 Apr 2026 08:21:14 -0700 Subject: [PATCH] refactor(memory): remove flush_memories entirely (#15696) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The AIAgent.flush_memories pre-compression save, the gateway _flush_memories_for_session, and everything feeding them are obsolete now that the background memory/skill review handles persistent memory extraction. Problems with flush_memories: - Pre-dates the background review loop. It was the only memory-save path when introduced; the background review now fires every 10 user turns on CLI and gateway alike, which is far more frequent than compression or session reset ever triggered flush. - Blocking and synchronous. Pre-compression flush ran on the live agent before compression, blocking the user-visible response. - Cache-breaking. Flush built a temporary conversation prefix (system prompt + memory-only tool list) that diverged from the live conversation's cached prefix, invalidating prompt caching. The gateway variant spawned a fresh AIAgent with its own clean prompt for each finalized session — still cache-breaking, just in a different process. - Redundant. Background review runs in the live conversation's session context, gets the same content, writes to the same memory store, and doesn't break the cache. Everything flush_memories claimed to preserve is already covered. What this removes: - AIAgent.flush_memories() method (~248 LOC in run_agent.py) - Pre-compression flush call in _compress_context - flush_memories call sites in cli.py (/new + exit) - GatewayRunner._flush_memories_for_session + _async_flush_memories (and the 3 call sites: session expiry watcher, /new, /resume) - 'flush_memories' entry from DEFAULT_CONFIG auxiliary tasks, hermes tools UI task list, auxiliary_client docstrings - _memory_flush_min_turns config + init - #15631's headroom-deduction math in _check_compression_model_feasibility (headroom was only needed because flush dragged the full main-agent system prompt along; the compression summariser sends a single user-role prompt so new_threshold = aux_context is safe again) - The dedicated test files and assertions that exercised flush-specific paths What this renames (with read-time backcompat on sessions.json): - SessionEntry.memory_flushed -> SessionEntry.expiry_finalized. The session-expiry watcher still uses the flag to avoid re-running finalize/eviction on the same expired session; the new name reflects what it now actually gates. from_dict() reads 'expiry_finalized' first, falls back to the legacy 'memory_flushed' key so existing sessions.json files upgrade seamlessly. Supersedes #15631 and #15638. Tested: 383 targeted tests pass across run_agent/, agent/, cli/, and gateway/ session-boundary suites. No behavior regressions — background memory review continues to handle persistent memory extraction on both CLI and gateway. --- agent/anthropic_adapter.py | 6 +- agent/auxiliary_client.py | 8 +- cli.py | 10 - gateway/run.py | 215 ++-------- gateway/session.py | 14 +- hermes_cli/config.py | 8 - hermes_cli/main.py | 1 - plugins/memory/openviking/__init__.py | 2 +- run_agent.py | 273 +----------- .../test_auxiliary_named_custom_providers.py | 6 +- .../test_unsupported_temperature_retry.py | 8 +- tests/cli/test_cli_new_session.py | 2 - tests/gateway/test_async_memory_flush.py | 249 ----------- .../gateway/test_flush_memory_stale_guard.py | 240 ----------- tests/gateway/test_resume_command.py | 30 +- tests/gateway/test_session_boundary_hooks.py | 20 +- .../test_session_boundary_security_state.py | 3 +- .../test_compress_focus_plugin_fallback.py | 1 - .../run_agent/test_compression_feasibility.py | 95 +---- tests/run_agent/test_flush_memories_codex.py | 398 ------------------ tests/run_agent/test_run_agent.py | 42 -- website/docs/user-guide/configuration.md | 8 - .../user-guide/features/fallback-providers.md | 6 - 23 files changed, 78 insertions(+), 1567 deletions(-) delete mode 100644 tests/gateway/test_async_memory_flush.py delete mode 100644 tests/gateway/test_flush_memory_stale_guard.py delete mode 100644 tests/run_agent/test_flush_memories_codex.py diff --git a/agent/anthropic_adapter.py b/agent/anthropic_adapter.py index 01fb8e48be..96903875b1 100644 --- a/agent/anthropic_adapter.py +++ b/agent/anthropic_adapter.py @@ -1680,9 +1680,9 @@ def build_anthropic_kwargs( # ── Strip sampling params on 4.7+ ───────────────────────────────── # Opus 4.7 rejects any non-default temperature/top_p/top_k with a 400. - # Callers (auxiliary_client, flush_memories, etc.) may set these for - # older models; drop them here as a safety net so upstream 4.6 → 4.7 - # migrations don't require coordinated edits everywhere. + # Callers (auxiliary_client, etc.) may set these for older models; + # drop them here as a safety net so upstream 4.6 → 4.7 migrations + # don't require coordinated edits everywhere. if _forbids_sampling_params(model): for _sampling_key in ("temperature", "top_p", "top_k"): kwargs.pop(_sampling_key, None) diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index 1e1931d0d6..3cad451136 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -390,7 +390,7 @@ class _CodexCompletionsAdapter: # Note: the Codex endpoint (chatgpt.com/backend-api/codex) does NOT # support max_output_tokens or temperature — omit to avoid 400 errors. - # Tools support for flush_memories and similar callers + # Tools support for auxiliary callers (e.g. skills_hub) that pass function schemas tools = kwargs.get("tools") if tools: converted = [] @@ -2803,8 +2803,8 @@ def _build_call_kwargs( temperature = fixed_temperature # Opus 4.7+ rejects any non-default temperature/top_p/top_k — silently - # drop here so auxiliary callers that hardcode temperature (e.g. 0.3 on - # flush_memories, 0 on structured-JSON extraction) don't 400 the moment + # drop here so auxiliary callers that hardcode temperature (e.g. 0 on + # structured-JSON extraction) don't 400 the moment # the aux model is flipped to 4.7. if temperature is not None: from agent.anthropic_adapter import _forbids_sampling_params @@ -2892,7 +2892,7 @@ def call_llm( Args: task: Auxiliary task name ("compression", "vision", "web_extract", - "session_search", "skills_hub", "mcp", "flush_memories"). + "session_search", "skills_hub", "mcp", "title_generation"). Reads provider:model from config/env. Ignored if provider is set. provider: Explicit provider override. model: Explicit model override. diff --git a/cli.py b/cli.py index 424594acf8..18c9e637f6 100644 --- a/cli.py +++ b/cli.py @@ -4668,10 +4668,6 @@ class HermesCLI: def new_session(self, silent=False): """Start a fresh session with a new session ID and cleared agent state.""" if self.agent and self.conversation_history: - try: - self.agent.flush_memories(self.conversation_history) - except (Exception, KeyboardInterrupt): - pass # Trigger memory extraction on the old session before session_id rotates. self.agent.commit_memory_session(self.conversation_history) self._notify_session_boundary("on_session_finalize") @@ -10788,12 +10784,6 @@ class HermesCLI: self.agent.interrupt() except Exception: pass - # Flush memories before exit (only for substantial conversations) - if self.agent and self.conversation_history: - try: - self.agent.flush_memories(self.conversation_history) - except (Exception, KeyboardInterrupt): - pass # Shut down voice recorder (release persistent audio stream) if hasattr(self, '_voice_recorder') and self._voice_recorder: try: diff --git a/gateway/run.py b/gateway/run.py index f5c1858dbf..bd0c5301fb 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -524,7 +524,7 @@ def _load_gateway_config() -> dict: def _resolve_gateway_model(config: dict | None = None) -> str: """Read model from config.yaml — single source of truth. - Without this, temporary AIAgent instances (memory flush, /compress) fall + Without this, temporary AIAgent instances (e.g. /compress) fall back to the hardcoded default which fails when the active provider is openai-codex. """ @@ -915,129 +915,6 @@ class GatewayRunner: e, ) - # ----------------------------------------------------------------- - - def _flush_memories_for_session( - self, - old_session_id: str, - session_key: Optional[str] = None, - ): - """Prompt the agent to save memories/skills before context is lost. - - Synchronous worker — meant to be called via run_in_executor from - an async context so it doesn't block the event loop. - """ - # Skip cron sessions — they run headless with no meaningful user - # conversation to extract memories from. - if old_session_id and old_session_id.startswith("cron_"): - logger.debug("Skipping memory flush for cron session: %s", old_session_id) - return - - try: - history = self.session_store.load_transcript(old_session_id) - if not history or len(history) < 4: - return - - from run_agent import AIAgent - model, runtime_kwargs = self._resolve_session_agent_runtime( - session_key=session_key, - ) - if not runtime_kwargs.get("api_key"): - return - - tmp_agent = AIAgent( - **runtime_kwargs, - model=model, - max_iterations=8, - quiet_mode=True, - skip_memory=True, # Flush agent — no memory provider - enabled_toolsets=["memory", "skills"], - session_id=old_session_id, - ) - try: - # Fully silence the flush agent — quiet_mode only suppresses init - # messages; tool call output still leaks to the terminal through - # _safe_print → _print_fn. Set a no-op to prevent that. - tmp_agent._print_fn = lambda *a, **kw: None - - # Build conversation history from transcript - msgs = [ - {"role": m.get("role"), "content": m.get("content")} - for m in history - if m.get("role") in ("user", "assistant") and m.get("content") - ] - - # Read live memory state from disk so the flush agent can see - # what's already saved and avoid overwriting newer entries. - _current_memory = "" - try: - from tools.memory_tool import get_memory_dir - _mem_dir = get_memory_dir() - for fname, label in [ - ("MEMORY.md", "MEMORY (your personal notes)"), - ("USER.md", "USER PROFILE (who the user is)"), - ]: - fpath = _mem_dir / fname - if fpath.exists(): - content = fpath.read_text(encoding="utf-8").strip() - if content: - _current_memory += f"\n\n## Current {label}:\n{content}" - except Exception: - pass # Non-fatal — flush still works, just without the guard - - # Give the agent a real turn to think about what to save - flush_prompt = ( - "[System: This session is about to be automatically reset due to " - "inactivity or a scheduled daily reset. The conversation context " - "will be cleared after this turn.\n\n" - "Review the conversation above and:\n" - "1. Save any important facts, preferences, or decisions to memory " - "(user profile or your notes) that would be useful in future sessions.\n" - "2. If you discovered a reusable workflow or solved a non-trivial " - "problem, consider saving it as a skill.\n" - "3. If nothing is worth saving, that's fine — just skip.\n\n" - ) - - if _current_memory: - flush_prompt += ( - "IMPORTANT — here is the current live state of memory. Other " - "sessions, cron jobs, or the user may have updated it since this " - "conversation ended. Do NOT overwrite or remove entries unless " - "the conversation above reveals something that genuinely " - "supersedes them. Only add new information that is not already " - "captured below." - f"{_current_memory}\n\n" - ) - - flush_prompt += ( - "Do NOT respond to the user. Just use the memory and skill_manage " - "tools if needed, then stop.]" - ) - - tmp_agent.run_conversation( - user_message=flush_prompt, - conversation_history=msgs, - ) - finally: - self._cleanup_agent_resources(tmp_agent) - logger.info("Pre-reset memory flush completed for session %s", old_session_id) - except Exception as e: - logger.debug("Pre-reset memory flush failed for session %s: %s", old_session_id, e) - - async def _async_flush_memories( - self, - old_session_id: str, - session_key: Optional[str] = None, - ): - """Run the sync memory flush in a thread pool so it won't block the event loop.""" - loop = asyncio.get_running_loop() - await loop.run_in_executor( - None, - self._flush_memories_for_session, - old_session_id, - session_key, - ) - @property def should_exit_cleanly(self) -> bool: return self._exit_cleanly @@ -2272,7 +2149,7 @@ class GatewayRunner: except Exception as e: logger.error("Recovered watcher setup error: %s", e) - # Start background session expiry watcher for proactive memory flushing + # Start background session expiry watcher to finalize expired sessions asyncio.create_task(self._session_expiry_watcher()) # Start background reconnection watcher for platforms that failed at startup @@ -2289,25 +2166,24 @@ class GatewayRunner: return True async def _session_expiry_watcher(self, interval: int = 300): - """Background task that proactively flushes memories for expired sessions. - - Runs every `interval` seconds (default 5 min). For each session that - has expired according to its reset policy, flushes memories in a thread - pool and marks the session so it won't be flushed again. + """Background task that finalizes expired sessions. - This means memories are already saved by the time the user sends their - next message, so there's no blocking delay. + Runs every ``interval`` seconds (default 5 min). For each session + whose reset policy has expired, invokes ``on_session_finalize`` + hooks, cleans up the cached AIAgent's tool resources, evicts the + cache entry so it can be garbage-collected, and marks the session + so it won't be finalized again. """ await asyncio.sleep(60) # initial delay — let the gateway fully start - _flush_failures: dict[str, int] = {} # session_id -> consecutive failure count - _MAX_FLUSH_RETRIES = 3 + _finalize_failures: dict[str, int] = {} # session_id -> consecutive failure count + _MAX_FINALIZE_RETRIES = 3 while self._running: try: self.session_store._ensure_loaded() # Collect expired sessions first, then log a single summary. _expired_entries = [] for key, entry in list(self.session_store._entries.items()): - if entry.memory_flushed: + if entry.expiry_finalized: continue if not self.session_store._is_session_expired(entry): continue @@ -2325,13 +2201,12 @@ class GatewayRunner: f"{p}:{c}" for p, c in sorted(_platforms.items()) ) logger.info( - "Session expiry: %d sessions to flush (%s)", + "Session expiry: %d sessions to finalize (%s)", len(_expired_entries), _plat_summary, ) for key, entry in _expired_entries: try: - await self._async_flush_memories(entry.session_id, key) try: from hermes_cli.plugins import invoke_hook as _invoke_hook _parts = key.split(":") @@ -2363,48 +2238,48 @@ class GatewayRunner: # be garbage-collected. Otherwise the cache grows # unbounded across the gateway's lifetime. self._evict_cached_agent(key) - # Mark as flushed and persist to disk so the flag + # Mark as finalized and persist to disk so the flag # survives gateway restarts. with self.session_store._lock: - entry.memory_flushed = True + entry.expiry_finalized = True self.session_store._save() logger.debug( - "Memory flush completed for session %s", + "Session expiry finalized for %s", entry.session_id, ) - _flush_failures.pop(entry.session_id, None) + _finalize_failures.pop(entry.session_id, None) except Exception as e: - failures = _flush_failures.get(entry.session_id, 0) + 1 - _flush_failures[entry.session_id] = failures - if failures >= _MAX_FLUSH_RETRIES: + failures = _finalize_failures.get(entry.session_id, 0) + 1 + _finalize_failures[entry.session_id] = failures + if failures >= _MAX_FINALIZE_RETRIES: logger.warning( - "Memory flush gave up after %d attempts for %s: %s. " - "Marking as flushed to prevent infinite retry loop.", + "Session finalize gave up after %d attempts for %s: %s. " + "Marking as finalized to prevent infinite retry loop.", failures, entry.session_id, e, ) with self.session_store._lock: - entry.memory_flushed = True + entry.expiry_finalized = True self.session_store._save() - _flush_failures.pop(entry.session_id, None) + _finalize_failures.pop(entry.session_id, None) else: logger.debug( - "Memory flush failed (%d/%d) for %s: %s", - failures, _MAX_FLUSH_RETRIES, entry.session_id, e, + "Session finalize failed (%d/%d) for %s: %s", + failures, _MAX_FINALIZE_RETRIES, entry.session_id, e, ) if _expired_entries: - _flushed = sum( - 1 for _, e in _expired_entries if e.memory_flushed + _done = sum( + 1 for _, e in _expired_entries if e.expiry_finalized ) - _failed = len(_expired_entries) - _flushed + _failed = len(_expired_entries) - _done if _failed: logger.info( - "Session expiry done: %d flushed, %d pending retry", - _flushed, _failed, + "Session expiry done: %d finalized, %d pending retry", + _done, _failed, ) else: logger.info( - "Session expiry done: %d flushed", _flushed, + "Session expiry done: %d finalized", _done, ) # Sweep agents that have been idle beyond the TTL regardless @@ -5021,19 +4896,11 @@ class GatewayRunner: # Get existing session key session_key = self._session_key_for_source(source) self._invalidate_session_run_generation(session_key, reason="session_reset") - - # Flush memories in the background (fire-and-forget) so the user - # gets the "Session reset!" response immediately. - try: - old_entry = self.session_store._entries.get(session_key) - if old_entry: - _flush_task = asyncio.create_task( - self._async_flush_memories(old_entry.session_id, session_key) - ) - self._background_tasks.add(_flush_task) - _flush_task.add_done_callback(self._background_tasks.discard) - except Exception as e: - logger.debug("Gateway memory flush on reset failed: %s", e) + + # Snapshot the old entry so on_session_finalize can report the + # expiring session id before reset_session() rotates it. + old_entry = self.session_store._entries.get(session_key) + # Close tool resources on the old agent (terminal sandboxes, browser # daemons, background processes) before evicting from cache. # Guard with getattr because test fixtures may skip __init__. @@ -7252,16 +7119,6 @@ class GatewayRunner: if current_entry.session_id == target_id: return f"📌 Already on session **{name}**." - # Flush memories for current session before switching - try: - _flush_task = asyncio.create_task( - self._async_flush_memories(current_entry.session_id, session_key) - ) - self._background_tasks.add(_flush_task) - _flush_task.add_done_callback(self._background_tasks.discard) - except Exception as e: - logger.debug("Memory flush on resume failed: %s", e) - # Clear any running agent for this session key self._release_running_agent_state(session_key) diff --git a/gateway/session.py b/gateway/session.py index 8044c5c2c6..f677432baf 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -439,11 +439,11 @@ class SessionEntry: auto_reset_reason: Optional[str] = None # "idle" or "daily" reset_had_activity: bool = False # whether the expired session had any messages - # Set by the background expiry watcher after it successfully flushes - # memories for this session. Persisted to sessions.json so the flag - # survives gateway restarts (the old in-memory _pre_flushed_sessions - # set was lost on restart, causing redundant re-flushes). - memory_flushed: bool = False + # Set by the background expiry watcher after it finalizes an expired + # session (invoking on_session_finalize hooks and evicting the cached + # agent). Persisted to sessions.json so the flag survives gateway + # restarts — prevents redundant finalization runs. + expiry_finalized: bool = False # When True the next call to get_or_create_session() will auto-reset # this session (create a new session_id) so the user starts fresh. @@ -479,7 +479,7 @@ class SessionEntry: "last_prompt_tokens": self.last_prompt_tokens, "estimated_cost_usd": self.estimated_cost_usd, "cost_status": self.cost_status, - "memory_flushed": self.memory_flushed, + "expiry_finalized": self.expiry_finalized, "suspended": self.suspended, "resume_pending": self.resume_pending, "resume_reason": self.resume_reason, @@ -531,7 +531,7 @@ class SessionEntry: last_prompt_tokens=data.get("last_prompt_tokens", 0), estimated_cost_usd=data.get("estimated_cost_usd", 0.0), cost_status=data.get("cost_status", "unknown"), - memory_flushed=data.get("memory_flushed", False), + expiry_finalized=data.get("expiry_finalized", data.get("memory_flushed", False)), suspended=data.get("suspended", False), resume_pending=data.get("resume_pending", False), resume_reason=data.get("resume_reason"), diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 1962ce0b7a..4b7ff9fba7 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -612,14 +612,6 @@ DEFAULT_CONFIG = { "timeout": 30, "extra_body": {}, }, - "flush_memories": { - "provider": "auto", - "model": "", - "base_url": "", - "api_key": "", - "timeout": 30, - "extra_body": {}, - }, "title_generation": { "provider": "auto", "model": "", diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 55245228fe..7284f8814f 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -1707,7 +1707,6 @@ _AUX_TASKS: list[tuple[str, str, str]] = [ ("session_search", "Session search", "past-conversation recall"), ("approval", "Approval", "smart command approval"), ("mcp", "MCP", "MCP tool reasoning"), - ("flush_memories", "Flush memories", "memory consolidation"), ("title_generation", "Title generation", "session titles"), ("skills_hub", "Skills hub", "skills search/install"), ] diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 86d7ad5efb..f8687eb2bd 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -43,7 +43,7 @@ _TIMEOUT = 30.0 # --------------------------------------------------------------------------- # Process-level atexit safety net — ensures pending sessions are committed # even if shutdown_memory_provider is never called (e.g. gateway crash, -# SIGKILL, or exception in _async_flush_memories preventing shutdown). +# SIGKILL, or exception in the session expiry watcher preventing shutdown). # --------------------------------------------------------------------------- _last_active_provider: Optional["OpenVikingMemoryProvider"] = None diff --git a/run_agent.py b/run_agent.py index 8db343a2a5..55cbe04193 100644 --- a/run_agent.py +++ b/run_agent.py @@ -1578,7 +1578,6 @@ class AIAgent: self._memory_enabled = False self._user_profile_enabled = False self._memory_nudge_interval = 10 - self._memory_flush_min_turns = 6 self._turns_since_memory = 0 self._iters_since_skill = 0 if not skip_memory: @@ -1587,7 +1586,6 @@ class AIAgent: self._memory_enabled = mem_config.get("memory_enabled", False) self._user_profile_enabled = mem_config.get("user_profile_enabled", False) self._memory_nudge_interval = int(mem_config.get("nudge_interval", 10)) - self._memory_flush_min_turns = int(mem_config.get("flush_min_turns", 6)) if self._memory_enabled or self._user_profile_enabled: from tools.memory_tool import MemoryStore self._memory_store = MemoryStore( @@ -2427,23 +2425,12 @@ class AIAgent: # above guarantees aux_context >= MINIMUM_CONTEXT_LENGTH, # so the new threshold is always >= 64K. # - # Headroom: the threshold budgets RAW MESSAGES only, but the - # actual request auxiliary callers send also includes the - # system prompt and every tool schema. With 50+ tools that - # overhead can be 25-30K tokens; setting new_threshold = - # aux_context directly would let messages grow right to the - # aux limit and the first compression/flush request would - # overflow with HTTP 400. Subtract a dynamic headroom - # estimate so the full request still fits. - from agent.model_metadata import estimate_request_tokens_rough - tool_overhead = estimate_request_tokens_rough([], tools=self.tools) - # System prompt is not yet built at __init__ time; allow a - # conservative 10K budget (SOUL/AGENTS.md + memory snapshot + - # skills guidance) plus 2K for the flush instruction and a - # small safety margin. - headroom = tool_overhead + 12_000 + # The compression summariser sends a single user-role + # prompt (no system prompt, no tools) to the aux model, so + # new_threshold == aux_context is safe: the request is + # the raw messages plus a small summarisation instruction. old_threshold = threshold - new_threshold = max(aux_context - headroom, MINIMUM_CONTEXT_LENGTH) + new_threshold = aux_context self.context_compressor.threshold_tokens = new_threshold # Keep threshold_percent in sync so future main-model # context_length changes (update_model) re-derive from a @@ -7927,254 +7914,6 @@ class AIAgent: """ return self.api_mode != "codex_responses" - def flush_memories(self, messages: list = None, min_turns: int = None): - """Give the model one turn to persist memories before context is lost. - - Called before compression, session reset, or CLI exit. Injects a flush - message, makes one API call, executes any memory tool calls, then - strips all flush artifacts from the message list. - - Args: - messages: The current conversation messages. If None, uses - self._session_messages (last run_conversation state). - min_turns: Minimum user turns required to trigger the flush. - None = use config value (flush_min_turns). - 0 = always flush (used for compression). - """ - if self._memory_flush_min_turns == 0 and min_turns is None: - return - if "memory" not in self.valid_tool_names or not self._memory_store: - return - effective_min = min_turns if min_turns is not None else self._memory_flush_min_turns - if self._user_turn_count < effective_min: - return - - if messages is None: - messages = getattr(self, '_session_messages', None) - if not messages or len(messages) < 3: - return - - flush_content = ( - "[System: The session is being compressed. " - "Save anything worth remembering — prioritize user preferences, " - "corrections, and recurring patterns over task-specific details.]" - ) - _sentinel = f"__flush_{id(self)}_{time.monotonic()}" - flush_msg = {"role": "user", "content": flush_content, "_flush_sentinel": _sentinel} - messages.append(flush_msg) - - try: - # Build API messages for the flush call - _needs_sanitize = self._should_sanitize_tool_calls() - api_messages = [] - for msg in messages: - api_msg = msg.copy() - self._copy_reasoning_content_for_api(msg, api_msg) - api_msg.pop("reasoning", None) - api_msg.pop("finish_reason", None) - api_msg.pop("_flush_sentinel", None) - api_msg.pop("_thinking_prefill", None) - if _needs_sanitize: - self._sanitize_tool_calls_for_strict_api(api_msg) - api_messages.append(api_msg) - - if self._cached_system_prompt: - api_messages = [{"role": "system", "content": self._cached_system_prompt}] + api_messages - - # Make one API call with only the memory tool available - memory_tool_def = None - for t in (self.tools or []): - if t.get("function", {}).get("name") == "memory": - memory_tool_def = t - break - - if not memory_tool_def: - messages.pop() # remove flush msg - return - - # Use auxiliary client for the flush call when available -- - # it's cheaper and avoids Codex Responses API incompatibility. - from agent.auxiliary_client import ( - call_llm as _call_llm, - _fixed_temperature_for_model, - OMIT_TEMPERATURE, - ) - _aux_available = True - # Kimi models manage temperature server-side — omit it entirely. - # Other models with a fixed contract get that value; everyone else - # gets the historical 0.3 default. - _fixed_temp = _fixed_temperature_for_model(self.model, self.base_url) - _omit_temperature = _fixed_temp is OMIT_TEMPERATURE - if _omit_temperature: - _flush_temperature = None - elif _fixed_temp is not None: - _flush_temperature = _fixed_temp - else: - _flush_temperature = 0.3 - aux_error = None - try: - response = _call_llm( - task="flush_memories", - messages=api_messages, - tools=[memory_tool_def], - temperature=_flush_temperature, - max_tokens=5120, - # timeout resolved from auxiliary.flush_memories.timeout config - ) - except Exception as e: - aux_error = e - _aux_available = False - response = None - - if not _aux_available and self.api_mode == "codex_responses": - # No auxiliary client -- use the Codex Responses path directly. - # The Responses API does not accept `temperature` on any - # supported backend (chatgpt.com/backend-api/codex rejects it - # outright; api.openai.com + gpt-5/o-series reasoning models - # and Copilot Responses reject it on reasoning models). The - # transport intentionally never sets it — strip any leftover - # here so the flush fallback matches the main-loop behavior. - codex_kwargs = self._build_api_kwargs(api_messages) - _ct_flush = self._get_transport() - if _ct_flush is not None: - codex_kwargs["tools"] = _ct_flush.convert_tools([memory_tool_def]) - elif not codex_kwargs.get("tools"): - codex_kwargs["tools"] = [memory_tool_def] - codex_kwargs.pop("temperature", None) - if "max_output_tokens" in codex_kwargs: - codex_kwargs["max_output_tokens"] = 5120 - response = self._run_codex_stream(codex_kwargs) - elif not _aux_available and self.api_mode == "anthropic_messages": - # Native Anthropic — use the transport for kwargs - _tflush = self._get_transport() - ant_kwargs = _tflush.build_kwargs( - model=self.model, messages=api_messages, - tools=[memory_tool_def], max_tokens=5120, - reasoning_config=None, - preserve_dots=self._anthropic_preserve_dots(), - ) - response = self._anthropic_messages_create(ant_kwargs) - elif not _aux_available: - api_kwargs = { - "model": self.model, - "messages": api_messages, - "tools": [memory_tool_def], - **self._max_tokens_param(5120), - } - if _flush_temperature is not None: - api_kwargs["temperature"] = _flush_temperature - from agent.auxiliary_client import _get_task_timeout - response = self._ensure_primary_openai_client(reason="flush_memories").chat.completions.create( - **api_kwargs, timeout=_get_task_timeout("flush_memories") - ) - - if aux_error is not None: - logger.warning("Auxiliary memory flush failed; used fallback path: %s", aux_error) - self._emit_auxiliary_failure("memory flush", aux_error) - - def _openai_tool_calls(resp): - if resp is not None and hasattr(resp, "choices") and resp.choices: - msg = getattr(resp.choices[0], "message", None) - calls = getattr(msg, "tool_calls", None) - if calls: - return calls - return [] - - def _codex_output_tool_calls(resp): - calls = [] - for item in getattr(resp, "output", []) or []: - if getattr(item, "type", None) == "function_call": - calls.append(SimpleNamespace( - id=getattr(item, "call_id", None), - type="function", - function=SimpleNamespace( - name=getattr(item, "name", ""), - arguments=getattr(item, "arguments", "{}"), - ), - )) - return calls - - # Extract tool calls from the response, handling all API formats - tool_calls = [] - if self.api_mode == "codex_responses" and not _aux_available: - _ct_flush = self._get_transport() - _cnr_flush = _ct_flush.normalize_response(response) if _ct_flush is not None else None - if _cnr_flush and _cnr_flush.tool_calls: - tool_calls = [ - SimpleNamespace( - id=tc.id, type="function", - function=SimpleNamespace(name=tc.name, arguments=tc.arguments), - ) for tc in _cnr_flush.tool_calls - ] - else: - tool_calls = _codex_output_tool_calls(response) - elif self.api_mode == "anthropic_messages" and not _aux_available: - _tfn = self._get_transport() - _flush_result = _tfn.normalize_response(response, strip_tool_prefix=self._is_anthropic_oauth) - if _flush_result and _flush_result.tool_calls: - tool_calls = [ - SimpleNamespace( - id=tc.id, type="function", - function=SimpleNamespace(name=tc.name, arguments=tc.arguments), - ) for tc in _flush_result.tool_calls - ] - elif self.api_mode in ("chat_completions", "bedrock_converse"): - # chat_completions / bedrock — normalize through transport - _tfn = self._get_transport() - _flush_result = _tfn.normalize_response(response) if _tfn is not None else None - if _flush_result and _flush_result.tool_calls: - tool_calls = _flush_result.tool_calls - else: - tool_calls = _openai_tool_calls(response) - elif _aux_available and hasattr(response, "choices") and response.choices: - # Auxiliary client returned OpenAI-shaped response while main - # api_mode is codex/anthropic — extract tool_calls from .choices - tool_calls = _openai_tool_calls(response) - - for tc in tool_calls: - if tc.function.name == "memory": - try: - args = json.loads(tc.function.arguments) - flush_target = args.get("target", "memory") - from tools.memory_tool import memory_tool as _memory_tool - _memory_tool( - action=args.get("action"), - target=flush_target, - content=args.get("content"), - old_text=args.get("old_text"), - store=self._memory_store, - ) - if self._memory_manager and args.get("action") in ("add", "replace"): - try: - self._memory_manager.on_memory_write( - args.get("action", ""), - flush_target, - args.get("content", ""), - metadata=self._build_memory_write_metadata( - write_origin="memory_flush", - execution_context="flush_memories", - ), - ) - except Exception: - pass - if not self.quiet_mode: - print(f" 🧠 Memory flush: saved to {args.get('target', 'memory')}") - except Exception as e: - logger.warning("Memory flush tool call failed: %s", e) - self._emit_auxiliary_failure("memory flush tool", e) - except Exception as e: - logger.warning("Memory flush API call failed: %s", e) - self._emit_auxiliary_failure("memory flush", e) - finally: - # Strip flush artifacts: remove everything from the flush message onward. - # Use sentinel marker instead of identity check for robustness. - while messages and messages[-1].get("_flush_sentinel") != _sentinel: - messages.pop() - if not messages: - break - if messages and messages[-1].get("_flush_sentinel") == _sentinel: - messages.pop() - def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None, task_id: str = "default", focus_topic: str = None) -> tuple: """Compress conversation context and split the session in SQLite. @@ -8193,8 +7932,6 @@ class AIAgent: f"{approx_tokens:,}" if approx_tokens else "unknown", self.model, focus_topic, ) - # Pre-compression memory flush: let the model save memories before they're lost - self.flush_memories(messages, min_turns=0) # Notify external memory provider before compression discards context if self._memory_manager: diff --git a/tests/agent/test_auxiliary_named_custom_providers.py b/tests/agent/test_auxiliary_named_custom_providers.py index 5152428b6f..79f8b2f7e7 100644 --- a/tests/agent/test_auxiliary_named_custom_providers.py +++ b/tests/agent/test_auxiliary_named_custom_providers.py @@ -386,7 +386,7 @@ class TestProvidersDictApiModeAnthropicMessages: }, }, "auxiliary": { - "flush_memories": { + "compression": { "provider": "myrelay", "model": "claude-sonnet-4.6", }, @@ -399,11 +399,11 @@ class TestProvidersDictApiModeAnthropicMessages: AnthropicAuxiliaryClient, AsyncAnthropicAuxiliaryClient, ) - async_client, async_model = get_async_text_auxiliary_client("flush_memories") + async_client, async_model = get_async_text_auxiliary_client("compression") assert isinstance(async_client, AsyncAnthropicAuxiliaryClient) assert async_model == "claude-sonnet-4.6" - sync_client, sync_model = get_text_auxiliary_client("flush_memories") + sync_client, sync_model = get_text_auxiliary_client("compression") assert isinstance(sync_client, AnthropicAuxiliaryClient) assert sync_model == "claude-sonnet-4.6" diff --git a/tests/agent/test_unsupported_temperature_retry.py b/tests/agent/test_unsupported_temperature_retry.py index 1e22a4d801..82d8d3208d 100644 --- a/tests/agent/test_unsupported_temperature_retry.py +++ b/tests/agent/test_unsupported_temperature_retry.py @@ -1,7 +1,7 @@ """Regression tests for the universal "unsupported temperature" retry in ``agent.auxiliary_client``. -Auxiliary callers (``flush_memories``, context compression, session search, +Auxiliary callers (context compression, session search, web extract summarisation, etc.) hardcode ``temperature=0.3`` for historical reasons. Several provider/model combinations reject ``temperature`` with a 400: @@ -100,7 +100,7 @@ class TestCallLlmUnsupportedTemperatureRetry: side_effect=lambda resp, _task: resp), ): result = call_llm( - task="flush_memories", + task="compression", messages=[{"role": "user", "content": "remember this"}], temperature=0.3, max_tokens=500, @@ -136,7 +136,7 @@ class TestCallLlmUnsupportedTemperatureRetry: ): with pytest.raises(RuntimeError, match="Invalid value"): call_llm( - task="flush_memories", + task="compression", messages=[{"role": "user", "content": "x"}], temperature=0.3, max_tokens=500, @@ -166,7 +166,7 @@ class TestCallLlmUnsupportedTemperatureRetry: ): with pytest.raises(RuntimeError): call_llm( - task="flush_memories", + task="compression", messages=[{"role": "user", "content": "x"}], temperature=None, # explicit: no temperature sent max_tokens=500, diff --git a/tests/cli/test_cli_new_session.py b/tests/cli/test_cli_new_session.py index dbfc07db21..63d07d26d2 100644 --- a/tests/cli/test_cli_new_session.py +++ b/tests/cli/test_cli_new_session.py @@ -33,7 +33,6 @@ class _FakeAgent: self._todo_store.write( [{"id": "t1", "content": "unfinished task", "status": "in_progress"}] ) - self.flush_memories = MagicMock() self.commit_memory_session = MagicMock() self._invalidate_system_prompt = MagicMock() @@ -157,7 +156,6 @@ def test_new_command_creates_real_fresh_session_and_resets_agent_state(tmp_path) assert cli.agent._todo_store.read() == [] assert cli.session_start > old_session_start assert cli.agent.session_start == cli.session_start - cli.agent.flush_memories.assert_called_once_with([{"role": "user", "content": "hello"}]) cli.agent._invalidate_system_prompt.assert_called_once() diff --git a/tests/gateway/test_async_memory_flush.py b/tests/gateway/test_async_memory_flush.py deleted file mode 100644 index 0d73194904..0000000000 --- a/tests/gateway/test_async_memory_flush.py +++ /dev/null @@ -1,249 +0,0 @@ -"""Tests for proactive memory flush on session expiry. - -Verifies that: -1. _is_session_expired() works from a SessionEntry alone (no source needed) -2. The sync callback is no longer called in get_or_create_session -3. memory_flushed flag persists across save/load cycles (prevents restart re-flush) -4. The background watcher can detect expired sessions -""" - -import pytest -from datetime import datetime, timedelta -from pathlib import Path -from unittest.mock import patch, MagicMock - -from gateway.config import Platform, GatewayConfig, SessionResetPolicy -from gateway.session import SessionSource, SessionStore, SessionEntry - - -@pytest.fixture() -def idle_store(tmp_path): - """SessionStore with a 60-minute idle reset policy.""" - config = GatewayConfig( - default_reset_policy=SessionResetPolicy(mode="idle", idle_minutes=60), - ) - with patch("gateway.session.SessionStore._ensure_loaded"): - s = SessionStore(sessions_dir=tmp_path, config=config) - s._db = None - s._loaded = True - return s - - -@pytest.fixture() -def no_reset_store(tmp_path): - """SessionStore with no reset policy (mode=none).""" - config = GatewayConfig( - default_reset_policy=SessionResetPolicy(mode="none"), - ) - with patch("gateway.session.SessionStore._ensure_loaded"): - s = SessionStore(sessions_dir=tmp_path, config=config) - s._db = None - s._loaded = True - return s - - -class TestIsSessionExpired: - """_is_session_expired should detect expiry from entry alone.""" - - def test_idle_session_expired(self, idle_store): - entry = SessionEntry( - session_key="agent:main:telegram:dm", - session_id="sid_1", - created_at=datetime.now() - timedelta(hours=3), - updated_at=datetime.now() - timedelta(minutes=120), - platform=Platform.TELEGRAM, - chat_type="dm", - ) - assert idle_store._is_session_expired(entry) is True - - def test_active_session_not_expired(self, idle_store): - entry = SessionEntry( - session_key="agent:main:telegram:dm", - session_id="sid_2", - created_at=datetime.now() - timedelta(hours=1), - updated_at=datetime.now() - timedelta(minutes=10), - platform=Platform.TELEGRAM, - chat_type="dm", - ) - assert idle_store._is_session_expired(entry) is False - - def test_none_mode_never_expires(self, no_reset_store): - entry = SessionEntry( - session_key="agent:main:telegram:dm", - session_id="sid_3", - created_at=datetime.now() - timedelta(days=30), - updated_at=datetime.now() - timedelta(days=30), - platform=Platform.TELEGRAM, - chat_type="dm", - ) - assert no_reset_store._is_session_expired(entry) is False - - def test_active_processes_prevent_expiry(self, idle_store): - """Sessions with active background processes should never expire.""" - idle_store._has_active_processes_fn = lambda key: True - entry = SessionEntry( - session_key="agent:main:telegram:dm", - session_id="sid_4", - created_at=datetime.now() - timedelta(hours=5), - updated_at=datetime.now() - timedelta(hours=5), - platform=Platform.TELEGRAM, - chat_type="dm", - ) - assert idle_store._is_session_expired(entry) is False - - def test_daily_mode_expired(self, tmp_path): - """Daily mode should expire sessions from before today's reset hour.""" - config = GatewayConfig( - default_reset_policy=SessionResetPolicy(mode="daily", at_hour=4), - ) - with patch("gateway.session.SessionStore._ensure_loaded"): - store = SessionStore(sessions_dir=tmp_path, config=config) - store._db = None - store._loaded = True - - entry = SessionEntry( - session_key="agent:main:telegram:dm", - session_id="sid_5", - created_at=datetime.now() - timedelta(days=2), - updated_at=datetime.now() - timedelta(days=2), - platform=Platform.TELEGRAM, - chat_type="dm", - ) - assert store._is_session_expired(entry) is True - - -class TestGetOrCreateSessionNoCallback: - """get_or_create_session should NOT call a sync flush callback.""" - - def test_auto_reset_creates_new_session_after_flush(self, idle_store): - """When a flushed session auto-resets, a new session_id is created.""" - source = SessionSource( - platform=Platform.TELEGRAM, - chat_id="123", - chat_type="dm", - ) - # Create initial session - entry1 = idle_store.get_or_create_session(source) - old_sid = entry1.session_id - - # Simulate the watcher having flushed it - entry1.memory_flushed = True - - # Simulate the session going idle - entry1.updated_at = datetime.now() - timedelta(minutes=120) - idle_store._save() - - # Next call should auto-reset - entry2 = idle_store.get_or_create_session(source) - assert entry2.session_id != old_sid - assert entry2.was_auto_reset is True - # New session starts with memory_flushed=False - assert entry2.memory_flushed is False - - def test_no_sync_callback_invoked(self, idle_store): - """No synchronous callback should block during auto-reset.""" - source = SessionSource( - platform=Platform.TELEGRAM, - chat_id="123", - chat_type="dm", - ) - entry1 = idle_store.get_or_create_session(source) - entry1.updated_at = datetime.now() - timedelta(minutes=120) - idle_store._save() - - # Verify no _on_auto_reset attribute - assert not hasattr(idle_store, '_on_auto_reset') - - # This should NOT block (no sync LLM call) - entry2 = idle_store.get_or_create_session(source) - assert entry2.was_auto_reset is True - - -class TestMemoryFlushedFlag: - """The memory_flushed flag on SessionEntry prevents double-flushing.""" - - def test_defaults_to_false(self): - entry = SessionEntry( - session_key="agent:main:telegram:dm:123", - session_id="sid_new", - created_at=datetime.now(), - updated_at=datetime.now(), - platform=Platform.TELEGRAM, - chat_type="dm", - ) - assert entry.memory_flushed is False - - def test_persists_through_save_load(self, idle_store): - """memory_flushed=True must survive a save/load cycle (simulates restart).""" - key = "agent:main:discord:thread:789" - entry = SessionEntry( - session_key=key, - session_id="sid_flushed", - created_at=datetime.now() - timedelta(hours=5), - updated_at=datetime.now() - timedelta(hours=5), - platform=Platform.DISCORD, - chat_type="thread", - memory_flushed=True, - ) - idle_store._entries[key] = entry - idle_store._save() - - # Simulate restart: clear in-memory state, reload from disk - idle_store._entries.clear() - idle_store._loaded = False - idle_store._ensure_loaded() - - reloaded = idle_store._entries[key] - assert reloaded.memory_flushed is True - - def test_unflushed_entry_survives_restart_as_unflushed(self, idle_store): - """An entry without memory_flushed stays False after reload.""" - key = "agent:main:telegram:dm:456" - entry = SessionEntry( - session_key=key, - session_id="sid_not_flushed", - created_at=datetime.now() - timedelta(hours=2), - updated_at=datetime.now() - timedelta(hours=2), - platform=Platform.TELEGRAM, - chat_type="dm", - ) - idle_store._entries[key] = entry - idle_store._save() - - idle_store._entries.clear() - idle_store._loaded = False - idle_store._ensure_loaded() - - reloaded = idle_store._entries[key] - assert reloaded.memory_flushed is False - - def test_roundtrip_to_dict_from_dict(self): - """to_dict/from_dict must preserve memory_flushed.""" - entry = SessionEntry( - session_key="agent:main:telegram:dm:999", - session_id="sid_rt", - created_at=datetime.now(), - updated_at=datetime.now(), - platform=Platform.TELEGRAM, - chat_type="dm", - memory_flushed=True, - ) - d = entry.to_dict() - assert d["memory_flushed"] is True - - restored = SessionEntry.from_dict(d) - assert restored.memory_flushed is True - - def test_legacy_entry_without_field_defaults_false(self): - """Old sessions.json entries missing memory_flushed should default to False.""" - data = { - "session_key": "agent:main:telegram:dm:legacy", - "session_id": "sid_legacy", - "created_at": datetime.now().isoformat(), - "updated_at": datetime.now().isoformat(), - "platform": "telegram", - "chat_type": "dm", - # no memory_flushed key - } - entry = SessionEntry.from_dict(data) - assert entry.memory_flushed is False diff --git a/tests/gateway/test_flush_memory_stale_guard.py b/tests/gateway/test_flush_memory_stale_guard.py deleted file mode 100644 index c4e4e1fb6d..0000000000 --- a/tests/gateway/test_flush_memory_stale_guard.py +++ /dev/null @@ -1,240 +0,0 @@ -"""Tests for memory flush stale-overwrite prevention (#2670). - -Verifies that: -1. Cron sessions are skipped (no flush for headless cron runs) -2. Current memory state is injected into the flush prompt so the - flush agent can see what's already saved and avoid overwrites -3. The flush still works normally when memory files don't exist -""" - -import sys -import types -import pytest -from pathlib import Path -from unittest.mock import MagicMock, patch, call - - -@pytest.fixture(autouse=True) -def _mock_dotenv(monkeypatch): - """gateway.run imports dotenv at module level; stub it so tests run without the package.""" - fake = types.ModuleType("dotenv") - fake.load_dotenv = lambda *a, **kw: None - monkeypatch.setitem(sys.modules, "dotenv", fake) - - -def _make_runner(): - from gateway.run import GatewayRunner - - runner = object.__new__(GatewayRunner) - runner._honcho_managers = {} - runner._honcho_configs = {} - runner._running_agents = {} - runner._pending_messages = {} - runner._pending_approvals = {} - runner.adapters = {} - runner.hooks = MagicMock() - runner.session_store = MagicMock() - return runner - - -_TRANSCRIPT_4_MSGS = [ - {"role": "user", "content": "hello"}, - {"role": "assistant", "content": "hi there"}, - {"role": "user", "content": "remember my name is Alice"}, - {"role": "assistant", "content": "Got it, Alice!"}, -] - - -class TestCronSessionBypass: - """Cron sessions should never trigger a memory flush.""" - - def test_cron_session_skipped(self): - runner = _make_runner() - runner._flush_memories_for_session("cron_job123_20260323_120000") - # session_store.load_transcript should never be called - runner.session_store.load_transcript.assert_not_called() - - def test_cron_session_with_prefix_skipped(self): - """Cron sessions with different prefixes are still skipped.""" - runner = _make_runner() - runner._flush_memories_for_session("cron_daily_20260323") - runner.session_store.load_transcript.assert_not_called() - - def test_non_cron_session_proceeds(self): - """Non-cron sessions should still attempt the flush.""" - runner = _make_runner() - runner.session_store.load_transcript.return_value = [] - runner._flush_memories_for_session("session_abc123") - runner.session_store.load_transcript.assert_called_once_with("session_abc123") - - -def _make_flush_context(monkeypatch, memory_dir=None): - """Return (runner, tmp_agent, fake_run_agent) with run_agent mocked in sys.modules.""" - tmp_agent = MagicMock() - fake_run_agent = types.ModuleType("run_agent") - fake_run_agent.AIAgent = MagicMock(return_value=tmp_agent) - monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) - - runner = _make_runner() - runner.session_store.load_transcript.return_value = _TRANSCRIPT_4_MSGS - return runner, tmp_agent, memory_dir - - -class TestMemoryInjection: - """The flush prompt should include current memory state from disk.""" - - def test_memory_content_injected_into_flush_prompt(self, tmp_path, monkeypatch): - """When memory files exist, their content appears in the flush prompt.""" - memory_dir = tmp_path / "memories" - memory_dir.mkdir() - (memory_dir / "MEMORY.md").write_text("Agent knows Python\n§\nUser prefers dark mode") - (memory_dir / "USER.md").write_text("Name: Alice\n§\nTimezone: PST") - - runner, tmp_agent, _ = _make_flush_context(monkeypatch, memory_dir) - - with ( - patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}), - patch("gateway.run._resolve_gateway_model", return_value="test-model"), - patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: memory_dir)}), - ): - runner._flush_memories_for_session("session_123") - - tmp_agent.run_conversation.assert_called_once() - flush_prompt = tmp_agent.run_conversation.call_args.kwargs.get("user_message", "") - - assert "Agent knows Python" in flush_prompt - assert "User prefers dark mode" in flush_prompt - assert "Name: Alice" in flush_prompt - assert "Timezone: PST" in flush_prompt - assert "Do NOT overwrite or remove entries" in flush_prompt - assert "current live state of memory" in flush_prompt - - def test_flush_works_without_memory_files(self, tmp_path, monkeypatch): - """When no memory files exist, flush still runs without the guard.""" - empty_dir = tmp_path / "no_memories" - empty_dir.mkdir() - - runner, tmp_agent, _ = _make_flush_context(monkeypatch) - - with ( - patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}), - patch("gateway.run._resolve_gateway_model", return_value="test-model"), - patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: empty_dir)}), - ): - runner._flush_memories_for_session("session_456") - - tmp_agent.run_conversation.assert_called_once() - flush_prompt = tmp_agent.run_conversation.call_args.kwargs.get("user_message", "") - assert "Do NOT overwrite or remove entries" not in flush_prompt - assert "Review the conversation above" in flush_prompt - - def test_empty_memory_files_no_injection(self, tmp_path, monkeypatch): - """Empty memory files should not trigger the guard section.""" - memory_dir = tmp_path / "memories" - memory_dir.mkdir() - (memory_dir / "MEMORY.md").write_text("") - (memory_dir / "USER.md").write_text(" \n ") # whitespace only - - runner, tmp_agent, _ = _make_flush_context(monkeypatch) - - with ( - patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}), - patch("gateway.run._resolve_gateway_model", return_value="test-model"), - patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: memory_dir)}), - ): - runner._flush_memories_for_session("session_789") - - tmp_agent.run_conversation.assert_called_once() - flush_prompt = tmp_agent.run_conversation.call_args.kwargs.get("user_message", "") - assert "current live state of memory" not in flush_prompt - - -class TestFlushAgentSilenced: - """The flush agent must not produce any terminal output.""" - - def test_print_fn_set_to_noop(self, tmp_path, monkeypatch): - """_print_fn on the flush agent must be a no-op so tool output never leaks.""" - runner = _make_runner() - runner.session_store.load_transcript.return_value = _TRANSCRIPT_4_MSGS - - captured_agent = {} - - def _fake_ai_agent(*args, **kwargs): - agent = MagicMock() - captured_agent["instance"] = agent - return agent - - fake_run_agent = types.ModuleType("run_agent") - fake_run_agent.AIAgent = _fake_ai_agent - monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) - - with ( - patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}), - patch("gateway.run._resolve_gateway_model", return_value="test-model"), - patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: tmp_path)}), - ): - runner._flush_memories_for_session("session_silent") - - agent = captured_agent["instance"] - assert agent._print_fn is not None, "_print_fn should be overridden to suppress output" - # Confirm it is callable and produces no output (no exception) - agent._print_fn("should be silenced") - - def test_kawaii_spinner_respects_print_fn(self): - """KawaiiSpinner must route all output through print_fn when supplied.""" - from agent.display import KawaiiSpinner - - written = [] - spinner = KawaiiSpinner("test", print_fn=lambda *a, **kw: written.append(a)) - spinner._write("hello") - assert written == [("hello",)], "spinner should route through print_fn" - - # A no-op print_fn must produce no output to stdout - import io, sys - buf = io.StringIO() - old_stdout = sys.stdout - sys.stdout = buf - try: - silent_spinner = KawaiiSpinner("silent", print_fn=lambda *a, **kw: None) - silent_spinner._write("should not appear") - silent_spinner.stop("done") - finally: - sys.stdout = old_stdout - assert buf.getvalue() == "", "no-op print_fn spinner must not write to stdout" - - def test_flush_agent_closes_resources_after_run(self, monkeypatch): - """Memory flush should close temporary agent resources after the turn.""" - runner, tmp_agent, _ = _make_flush_context(monkeypatch) - tmp_agent.shutdown_memory_provider = MagicMock() - tmp_agent.close = MagicMock() - - with ( - patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}), - patch("gateway.run._resolve_gateway_model", return_value="test-model"), - patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: Path("/nonexistent"))}), - ): - runner._flush_memories_for_session("session_cleanup") - - tmp_agent.shutdown_memory_provider.assert_called_once() - tmp_agent.close.assert_called_once() - - -class TestFlushPromptStructure: - """Verify the flush prompt retains its core instructions.""" - - def test_core_instructions_present(self, monkeypatch): - """The flush prompt should still contain the original guidance.""" - runner, tmp_agent, _ = _make_flush_context(monkeypatch) - - with ( - patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}), - patch("gateway.run._resolve_gateway_model", return_value="test-model"), - patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: Path("/nonexistent"))}), - ): - runner._flush_memories_for_session("session_struct") - - flush_prompt = tmp_agent.run_conversation.call_args.kwargs.get("user_message", "") - assert "automatically reset" in flush_prompt - assert "Save any important facts" in flush_prompt - assert "consider saving it as a skill" in flush_prompt - assert "Do NOT respond to the user" in flush_prompt diff --git a/tests/gateway/test_resume_command.py b/tests/gateway/test_resume_command.py index c451b3fe3d..42377325e9 100644 --- a/tests/gateway/test_resume_command.py +++ b/tests/gateway/test_resume_command.py @@ -4,7 +4,7 @@ Tests the _handle_resume_command handler (switch to a previously-named session) across gateway messenger platforms. """ -from unittest.mock import MagicMock, AsyncMock +from unittest.mock import MagicMock import pytest @@ -53,9 +53,6 @@ def _make_runner(session_db=None, current_session_id="current_session_001", mock_store.switch_session.return_value = mock_session_entry runner.session_store = mock_store - # Stub out memory flushing - runner._async_flush_memories = AsyncMock() - return runner @@ -233,28 +230,3 @@ class TestHandleResumeCommand: assert real_key not in runner._running_agents db.close() - - @pytest.mark.asyncio - async def test_resume_flushes_memories(self, tmp_path): - """Resume should flush memories from the current session before switching.""" - from hermes_state import SessionDB - - db = SessionDB(db_path=tmp_path / "state.db") - db.create_session("old_session", "telegram") - db.set_session_title("old_session", "Old Work") - db.create_session("current_session_001", "telegram") - - event = _make_event(text="/resume Old Work") - runner = _make_runner( - session_db=db, - current_session_id="current_session_001", - event=event, - ) - - await runner._handle_resume_command(event) - - runner._async_flush_memories.assert_called_once_with( - "current_session_001", - "agent:main:telegram:dm:67890", - ) - db.close() diff --git a/tests/gateway/test_session_boundary_hooks.py b/tests/gateway/test_session_boundary_hooks.py index 52a5238cd3..255795492f 100644 --- a/tests/gateway/test_session_boundary_hooks.py +++ b/tests/gateway/test_session_boundary_hooks.py @@ -177,8 +177,8 @@ async def test_idle_expiry_fires_finalize_hook(mock_invoke_hook): its reset policy (idle timeout, scheduled reset), it must fire ``on_session_finalize`` so plugin providers get the same final-pass extraction opportunity they'd get from /new or CLI shutdown. Before - the fix, the expiry path flushed memories and evicted the agent but - silently skipped the hook. + the fix, the expiry path evicted the agent but silently skipped the + hook. """ from datetime import datetime, timedelta @@ -200,7 +200,7 @@ async def test_idle_expiry_fires_finalize_hook(mock_invoke_hook): platform=Platform.TELEGRAM, chat_type="dm", ) - expired_entry.memory_flushed = False + expired_entry.expiry_finalized = False runner.session_store = MagicMock() runner.session_store._ensure_loaded = MagicMock() @@ -211,24 +211,24 @@ async def test_idle_expiry_fires_finalize_hook(mock_invoke_hook): runner.session_store._lock.__exit__ = MagicMock(return_value=None) runner.session_store._save = MagicMock() - runner._async_flush_memories = AsyncMock() runner._evict_cached_agent = MagicMock() runner._cleanup_agent_resources = MagicMock() runner._sweep_idle_cached_agents = MagicMock(return_value=0) # The watcher starts with `await asyncio.sleep(60)` and loops while - # `self._running`. Patch sleep so the 60s initial delay is instant, then - # flip `_running` false inside the flush call so the loop exits cleanly - # after one pass. + # `self._running`. Patch sleep so the 60s initial delay is instant, and + # make the expiry hook invocation flip `_running` false so the loop + # exits cleanly after one pass. _orig_sleep = __import__("asyncio").sleep async def _fast_sleep(_): await _orig_sleep(0) - async def _flush_and_stop(session_id, key): - runner._running = False # terminate the loop after this iteration + def _hook_and_stop(*a, **kw): + runner._running = False + return None - runner._async_flush_memories = AsyncMock(side_effect=_flush_and_stop) + mock_invoke_hook.side_effect = _hook_and_stop with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep): await runner._session_expiry_watcher(interval=0) diff --git a/tests/gateway/test_session_boundary_security_state.py b/tests/gateway/test_session_boundary_security_state.py index 9908badead..eb1b99866a 100644 --- a/tests/gateway/test_session_boundary_security_state.py +++ b/tests/gateway/test_session_boundary_security_state.py @@ -1,7 +1,7 @@ """Regression tests for approval-state cleanup on session boundaries.""" from datetime import datetime -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import MagicMock import pytest @@ -72,7 +72,6 @@ def _make_resume_runner(): runner = object.__new__(GatewayRunner) runner.adapters = {} runner._background_tasks = set() - runner._async_flush_memories = AsyncMock() runner._running_agents = {} runner._running_agents_ts = {} runner._busy_ack_ts = {} diff --git a/tests/run_agent/test_compress_focus_plugin_fallback.py b/tests/run_agent/test_compress_focus_plugin_fallback.py index 7b443a9919..f9c1b83dcc 100644 --- a/tests/run_agent/test_compress_focus_plugin_fallback.py +++ b/tests/run_agent/test_compress_focus_plugin_fallback.py @@ -31,7 +31,6 @@ def _make_agent_with_engine(engine): agent._vprint = lambda *a, **kw: None agent._last_flushed_db_idx = 0 # Stub the few AIAgent methods _compress_context uses. - agent.flush_memories = lambda *a, **kw: None agent._invalidate_system_prompt = lambda *a, **kw: None agent._build_system_prompt = lambda *a, **kw: "new-system-prompt" agent.commit_memory_session = lambda *a, **kw: None diff --git a/tests/run_agent/test_compression_feasibility.py b/tests/run_agent/test_compression_feasibility.py index 7b4b7da612..f935821ada 100644 --- a/tests/run_agent/test_compression_feasibility.py +++ b/tests/run_agent/test_compression_feasibility.py @@ -41,8 +41,6 @@ def _make_agent( agent.tool_progress_callback = None agent._compression_warning = None agent._aux_compression_context_length_config = None - # Tools feed into the headroom calculation in _check_compression_model_feasibility. - # Tests that want to assert specific threshold values can override this. agent.tools = [] compressor = MagicMock(spec=ContextCompressor) @@ -85,9 +83,8 @@ def test_auto_corrects_threshold_when_aux_context_below_threshold(mock_get_clien assert "threshold:" in messages[0] # Warning stored for gateway replay assert agent._compression_warning is not None - # Threshold on the live compressor was actually lowered, accounting for - # the request-overhead headroom (empty tools list → ~12K headroom only). - assert agent.context_compressor.threshold_tokens == 68_000 + # Threshold on the live compressor was actually lowered to aux_context. + assert agent.context_compressor.threshold_tokens == 80_000 @patch("agent.model_metadata.get_model_context_length", return_value=32_768) @@ -346,93 +343,7 @@ def test_just_below_threshold_auto_corrects(mock_get_client, mock_ctx_len): assert len(messages) == 1 assert "small-model" in messages[0] assert "Auto-lowered" in messages[0] - assert agent.context_compressor.threshold_tokens == 87_999 - - -# ── Headroom for system prompt + tool schemas ──────────────────────── - - -@patch("agent.model_metadata.get_model_context_length", return_value=128_000) -@patch("agent.auxiliary_client.get_text_auxiliary_client") -def test_auto_lowered_threshold_reserves_headroom_for_tools_and_system(mock_get_client, mock_ctx_len): - """When aux context binds the threshold, new_threshold must leave room - for the system prompt and tool schemas that auxiliary callers - (compression summariser, flush_memories) prepend to the message list. - - Without headroom, a full-budget message window + ~25K system/tool - overhead overflows the aux model with HTTP 400. Regression guard for - the flush_memories-on-busy-toolset overflow path. - """ - # Main context 200K, threshold 70% = 140K. Aux pins at 128K (below - # threshold → triggers auto-correct). - agent = _make_agent(main_context=200_000, threshold_percent=0.70) - - # Build a realistic tool schema load. - agent.tools = [ - { - "type": "function", - "function": { - "name": f"tool_{i}", - "description": "x" * 200, - "parameters": {"type": "object", "properties": {"arg": {"type": "string", "description": "y" * 120}}}, - }, - } - for i in range(50) - ] - - mock_client = MagicMock() - mock_client.base_url = "https://openrouter.ai/api/v1" - mock_client.api_key = "sk-aux" - mock_get_client.return_value = (mock_client, "model-with-128k") - - agent._emit_status = lambda msg: None - agent._check_compression_model_feasibility() - - new_threshold = agent.context_compressor.threshold_tokens - - # Must have strictly reserved headroom: new_threshold < aux_context. - assert new_threshold < 128_000, ( - f"threshold {new_threshold} did not reserve headroom below aux=128,000 " - f"— system prompt + tools would overflow the aux model" - ) - # Must respect the 64K hard floor. - from agent.model_metadata import MINIMUM_CONTEXT_LENGTH - assert new_threshold >= MINIMUM_CONTEXT_LENGTH - - -@patch("agent.model_metadata.get_model_context_length", return_value=80_000) -@patch("agent.auxiliary_client.get_text_auxiliary_client") -def test_headroom_floors_at_minimum_context(mock_get_client, mock_ctx_len): - """If headroom subtraction would push below 64K floor, clamp to 64K - rather than refusing the session — the aux is still workable for a - smaller message window. - """ - # Aux at 80K, with enough tools to push headroom > 16K → naive subtract - # would land at < 64K. The max(..., MINIMUM_CONTEXT_LENGTH) clamp must - # keep the session running. - agent = _make_agent(main_context=200_000, threshold_percent=0.50) - agent.tools = [ - { - "type": "function", - "function": { - "name": f"tool_{i}", - "description": "z" * 2_000, # fat descriptions - "parameters": {}, - }, - } - for i in range(30) - ] - - mock_client = MagicMock() - mock_client.base_url = "https://openrouter.ai/api/v1" - mock_client.api_key = "sk-aux" - mock_get_client.return_value = (mock_client, "small-aux-model") - - agent._emit_status = lambda msg: None - agent._check_compression_model_feasibility() - - from agent.model_metadata import MINIMUM_CONTEXT_LENGTH - assert agent.context_compressor.threshold_tokens == MINIMUM_CONTEXT_LENGTH + assert agent.context_compressor.threshold_tokens == 99_999 # ── Two-phase: __init__ + run_conversation replay ─────────────────── diff --git a/tests/run_agent/test_flush_memories_codex.py b/tests/run_agent/test_flush_memories_codex.py deleted file mode 100644 index 9863235bf9..0000000000 --- a/tests/run_agent/test_flush_memories_codex.py +++ /dev/null @@ -1,398 +0,0 @@ -"""Tests for flush_memories() working correctly across all provider modes. - -Catches the bug where Codex mode called chat.completions.create on a -Responses-only client, which would fail silently or with a 404. -""" - -import json -import os -import sys -import types -from types import SimpleNamespace -from unittest.mock import patch, MagicMock, call - -import pytest - -sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None)) -sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object)) -sys.modules.setdefault("fal_client", types.SimpleNamespace()) - -import run_agent - - -class _FakeOpenAI: - def __init__(self, **kwargs): - self.kwargs = kwargs - self.api_key = kwargs.get("api_key", "test") - self.base_url = kwargs.get("base_url", "http://test") - - def close(self): - pass - - -def _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter"): - """Build an AIAgent with mocked internals, ready for flush_memories testing.""" - monkeypatch.setattr(run_agent, "get_tool_definitions", lambda **kw: [ - { - "type": "function", - "function": { - "name": "memory", - "description": "Manage memories.", - "parameters": { - "type": "object", - "properties": { - "action": {"type": "string"}, - "target": {"type": "string"}, - "content": {"type": "string"}, - }, - }, - }, - }, - ]) - monkeypatch.setattr(run_agent, "check_toolset_requirements", lambda: {}) - monkeypatch.setattr(run_agent, "OpenAI", _FakeOpenAI) - - agent = run_agent.AIAgent( - api_key="test-key", - base_url="https://test.example.com/v1", - provider=provider, - api_mode=api_mode, - max_iterations=4, - quiet_mode=True, - skip_context_files=True, - skip_memory=True, - ) - # Give it a valid memory store - agent._memory_store = MagicMock() - agent._memory_flush_min_turns = 1 - agent._user_turn_count = 5 - return agent - - -def _chat_response_with_memory_call(): - """Simulated chat completions response with a memory tool call.""" - return SimpleNamespace( - choices=[SimpleNamespace( - finish_reason="tool_calls", - message=SimpleNamespace( - content=None, - tool_calls=[SimpleNamespace( - id="call_mem_0", - type="function", - function=SimpleNamespace( - name="memory", - arguments=json.dumps({ - "action": "add", - "target": "notes", - "content": "User prefers dark mode.", - }), - ), - )], - ), - )], - usage=SimpleNamespace(prompt_tokens=100, completion_tokens=20, total_tokens=120), - ) - - -class TestFlushMemoriesRespectsConfigTimeout: - """flush_memories() must NOT hardcode timeout=30.0 — it should defer - to the config value via auxiliary.flush_memories.timeout.""" - - def test_auxiliary_path_omits_explicit_timeout(self, monkeypatch): - """When calling _call_llm, timeout should NOT be passed so that - _get_task_timeout('flush_memories') reads from config.""" - agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter") - - mock_response = _chat_response_with_memory_call() - - with patch("agent.auxiliary_client.call_llm", return_value=mock_response) as mock_call: - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi"}, - {"role": "user", "content": "Note this"}, - ] - with patch("tools.memory_tool.memory_tool", return_value="Saved."): - agent.flush_memories(messages) - - mock_call.assert_called_once() - call_kwargs = mock_call.call_args - # timeout must NOT be explicitly passed (so _get_task_timeout resolves it) - assert "timeout" not in call_kwargs.kwargs, ( - "flush_memories should not pass explicit timeout to _call_llm; " - "let _get_task_timeout('flush_memories') resolve from config" - ) - - def test_fallback_path_uses_config_timeout(self, monkeypatch): - """When auxiliary client is unavailable and we fall back to direct - OpenAI client, timeout should come from _get_task_timeout, not hardcoded.""" - agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter") - agent.client = MagicMock() - agent.client.chat.completions.create.return_value = _chat_response_with_memory_call() - - custom_timeout = 180.0 - - with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")), \ - patch("agent.auxiliary_client._get_task_timeout", return_value=custom_timeout) as mock_gtt, \ - patch("tools.memory_tool.memory_tool", return_value="Saved."): - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi"}, - {"role": "user", "content": "Save this"}, - ] - agent.flush_memories(messages) - - mock_gtt.assert_called_once_with("flush_memories") - agent.client.chat.completions.create.assert_called_once() - call_kwargs = agent.client.chat.completions.create.call_args - assert call_kwargs.kwargs.get("timeout") == custom_timeout, ( - f"Expected timeout={custom_timeout} from config, got {call_kwargs.kwargs.get('timeout')}" - ) - - -class TestFlushMemoriesUsesAuxiliaryClient: - """When an auxiliary client is available, flush_memories should use it - instead of self.client -- especially critical in Codex mode.""" - - def test_flush_uses_auxiliary_when_available(self, monkeypatch): - agent = _make_agent(monkeypatch, api_mode="codex_responses", provider="openai-codex") - - mock_response = _chat_response_with_memory_call() - - with patch("agent.auxiliary_client.call_llm", return_value=mock_response) as mock_call: - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi there"}, - {"role": "user", "content": "Remember this"}, - ] - with patch("tools.memory_tool.memory_tool", return_value="Saved.") as mock_memory: - agent.flush_memories(messages) - - mock_call.assert_called_once() - call_kwargs = mock_call.call_args - assert call_kwargs.kwargs.get("task") == "flush_memories" - - def test_flush_uses_main_client_when_no_auxiliary(self, monkeypatch): - """Non-Codex mode with no auxiliary falls back to self.client.""" - agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter") - agent.client = MagicMock() - agent.client.chat.completions.create.return_value = _chat_response_with_memory_call() - - with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")): - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi there"}, - {"role": "user", "content": "Save this"}, - ] - with patch("tools.memory_tool.memory_tool", return_value="Saved."): - agent.flush_memories(messages) - - agent.client.chat.completions.create.assert_called_once() - - def test_auxiliary_provider_failure_surfaces_warning_and_falls_back(self, monkeypatch): - """Provider/API failures from auxiliary flush must be visible. - - Exhausted keys and rate limits are not always RuntimeError. They used - to fall into the broad outer handler and disappear into debug logs. - """ - agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter") - agent.client = MagicMock() - agent.client.chat.completions.create.return_value = _chat_response_with_memory_call() - events = [] - agent.status_callback = lambda kind, text=None: events.append((kind, text)) - - with patch("agent.auxiliary_client.call_llm", side_effect=Exception("opencode-go key exhausted")), \ - patch("tools.memory_tool.memory_tool", return_value="Saved."): - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi there"}, - {"role": "user", "content": "Save this"}, - ] - agent.flush_memories(messages) - - agent.client.chat.completions.create.assert_called_once() - assert any(kind == "warn" and "Auxiliary memory flush failed" in text for kind, text in events) - - def test_flush_executes_memory_tool_calls(self, monkeypatch): - """Verify that memory tool calls from the flush response actually get executed.""" - agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter") - - mock_response = _chat_response_with_memory_call() - - with patch("agent.auxiliary_client.call_llm", return_value=mock_response): - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi"}, - {"role": "user", "content": "Note this"}, - ] - with patch("tools.memory_tool.memory_tool", return_value="Saved.") as mock_memory: - agent.flush_memories(messages) - - mock_memory.assert_called_once() - call_kwargs = mock_memory.call_args - assert call_kwargs.kwargs["action"] == "add" - assert call_kwargs.kwargs["target"] == "notes" - assert "dark mode" in call_kwargs.kwargs["content"] - - def test_flush_bridges_memory_write_metadata(self, monkeypatch): - """Flush memory writes notify external providers with flush provenance.""" - agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter") - agent._memory_manager = MagicMock() - agent.session_id = "sess-flush" - agent.platform = "cli" - - mock_response = _chat_response_with_memory_call() - - with patch("agent.auxiliary_client.call_llm", return_value=mock_response): - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi"}, - {"role": "user", "content": "Note this"}, - ] - with patch("tools.memory_tool.memory_tool", return_value="Saved."): - agent.flush_memories(messages) - - agent._memory_manager.on_memory_write.assert_called_once() - call_kwargs = agent._memory_manager.on_memory_write.call_args - assert call_kwargs.args[:3] == ("add", "notes", "User prefers dark mode.") - assert call_kwargs.kwargs["metadata"]["write_origin"] == "memory_flush" - assert call_kwargs.kwargs["metadata"]["execution_context"] == "flush_memories" - assert call_kwargs.kwargs["metadata"]["session_id"] == "sess-flush" - - def test_flush_strips_artifacts_from_messages(self, monkeypatch): - """After flush, the flush prompt and any response should be removed from messages.""" - agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter") - - mock_response = _chat_response_with_memory_call() - - with patch("agent.auxiliary_client.call_llm", return_value=mock_response): - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi"}, - {"role": "user", "content": "Remember X"}, - ] - original_len = len(messages) - with patch("tools.memory_tool.memory_tool", return_value="Saved."): - agent.flush_memories(messages) - - # Messages should not grow from the flush - assert len(messages) <= original_len - # No flush sentinel should remain - for msg in messages: - assert "_flush_sentinel" not in msg - - -class TestFlushMemoriesCodexFallback: - """When no auxiliary client exists and we're in Codex mode, flush should - use the Codex Responses API path instead of chat.completions.""" - - def test_codex_mode_no_aux_uses_responses_api(self, monkeypatch): - agent = _make_agent(monkeypatch, api_mode="codex_responses", provider="openai-codex") - - codex_response = SimpleNamespace( - output=[ - SimpleNamespace( - type="function_call", - call_id="call_1", - name="memory", - arguments=json.dumps({ - "action": "add", - "target": "notes", - "content": "Codex flush test", - }), - ), - ], - usage=SimpleNamespace(input_tokens=50, output_tokens=10, total_tokens=60), - status="completed", - model="gpt-5-codex", - ) - - with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")), \ - patch.object(agent, "_run_codex_stream", return_value=codex_response) as mock_stream, \ - patch.object(agent, "_build_api_kwargs") as mock_build, \ - patch("tools.memory_tool.memory_tool", return_value="Saved.") as mock_memory: - mock_build.return_value = { - "model": "gpt-5-codex", - "instructions": "test", - "input": [], - "tools": [], - "max_output_tokens": 4096, - } - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi"}, - {"role": "user", "content": "Save this"}, - ] - agent.flush_memories(messages) - - mock_stream.assert_called_once() - mock_memory.assert_called_once() - assert mock_memory.call_args.kwargs["content"] == "Codex flush test" - - @pytest.mark.parametrize( - "provider,base_url", - [ - # chatgpt.com/backend-api/codex — rejects temperature unconditionally - ("openai-codex", "https://chatgpt.com/backend-api/codex"), - # Native OpenAI Responses — rejects temperature on gpt-5/o-series reasoning models - ("openai", "https://api.openai.com/v1"), - # Copilot Responses — rejects temperature on reasoning models - ("copilot", "https://api.githubcopilot.com"), - ], - ) - def test_codex_fallback_never_sends_temperature(self, monkeypatch, provider, base_url): - """Regression for the ``⚠ Auxiliary memory flush failed: HTTP 400: - Unsupported parameter: temperature`` error. - - The codex_responses fallback must strip temperature before calling - _run_codex_stream — the Responses API does not accept it on any - supported backend, matching the transport's behavior.""" - agent = _make_agent(monkeypatch, api_mode="codex_responses", provider=provider) - agent.base_url = base_url - - codex_response = SimpleNamespace( - output=[ - SimpleNamespace( - type="function_call", - call_id="call_1", - name="memory", - arguments=json.dumps({ - "action": "add", - "target": "notes", - "content": "no-temp test", - }), - ), - ], - usage=SimpleNamespace(input_tokens=50, output_tokens=10, total_tokens=60), - status="completed", - model="gpt-5.5", - ) - - with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")), \ - patch.object(agent, "_run_codex_stream", return_value=codex_response) as mock_stream, \ - patch.object(agent, "_build_api_kwargs") as mock_build, \ - patch("tools.memory_tool.memory_tool", return_value="Saved."): - # Simulate a transport that (correctly) never includes temperature, - # but also verify we strip any stray temperature the fallback used - # to inject before the fix. - mock_build.return_value = { - "model": "gpt-5.5", - "instructions": "test", - "input": [], - "tools": [], - "max_output_tokens": 4096, - # Intentionally poison the dict to prove we pop it: - "temperature": 0.3, - } - messages = [ - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi"}, - {"role": "user", "content": "Save this"}, - ] - agent.flush_memories(messages) - - mock_stream.assert_called_once() - sent_kwargs = mock_stream.call_args.args[0] - assert "temperature" not in sent_kwargs, ( - f"codex_responses fallback must strip temperature before calling " - f"_run_codex_stream, got: {sent_kwargs.get('temperature')!r}" - ) diff --git a/tests/run_agent/test_run_agent.py b/tests/run_agent/test_run_agent.py index 9c54daffe5..c7b039561b 100644 --- a/tests/run_agent/test_run_agent.py +++ b/tests/run_agent/test_run_agent.py @@ -3078,48 +3078,6 @@ class TestRetryExhaustion: assert "bad messages" in result["error"] -# --------------------------------------------------------------------------- -# Flush sentinel leak -# --------------------------------------------------------------------------- - - -class TestFlushSentinelNotLeaked: - """_flush_sentinel must be stripped before sending messages to the API.""" - - def test_flush_sentinel_stripped_from_api_messages(self, agent_with_memory_tool): - """Verify _flush_sentinel is not sent to the API provider.""" - agent = agent_with_memory_tool - agent._memory_store = MagicMock() - agent._memory_flush_min_turns = 1 - agent._user_turn_count = 10 - agent._cached_system_prompt = "system" - - messages = [ - {"role": "user", "content": "hello"}, - {"role": "assistant", "content": "hi"}, - {"role": "user", "content": "remember this"}, - ] - - # Mock the API to return a simple response (no tool calls) - mock_msg = SimpleNamespace(content="OK", tool_calls=None) - mock_choice = SimpleNamespace(message=mock_msg) - mock_response = SimpleNamespace(choices=[mock_choice]) - agent.client.chat.completions.create.return_value = mock_response - - # Bypass auxiliary client so flush uses agent.client directly - with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")): - agent.flush_memories(messages, min_turns=0) - - # Check what was actually sent to the API - call_args = agent.client.chat.completions.create.call_args - assert call_args is not None, "flush_memories never called the API" - api_messages = call_args.kwargs.get("messages") or call_args[1].get("messages") - for msg in api_messages: - assert "_flush_sentinel" not in msg, ( - f"_flush_sentinel leaked to API in message: {msg}" - ) - - # --------------------------------------------------------------------------- # Conversation history mutation # --------------------------------------------------------------------------- diff --git a/website/docs/user-guide/configuration.md b/website/docs/user-guide/configuration.md index 80f5c6f88b..16e0097e79 100644 --- a/website/docs/user-guide/configuration.md +++ b/website/docs/user-guide/configuration.md @@ -721,14 +721,6 @@ auxiliary: base_url: "" api_key: "" timeout: 30 - - # Memory flush — summarizes conversation for persistent memory - flush_memories: - provider: "auto" - model: "" - base_url: "" - api_key: "" - timeout: 30 ``` :::tip diff --git a/website/docs/user-guide/features/fallback-providers.md b/website/docs/user-guide/features/fallback-providers.md index 859be00c99..9ecefb0d03 100644 --- a/website/docs/user-guide/features/fallback-providers.md +++ b/website/docs/user-guide/features/fallback-providers.md @@ -168,7 +168,6 @@ Hermes uses separate lightweight models for side tasks. Each task has its own pr | Session Search | Past session summarization | `auxiliary.session_search` | | Skills Hub | Skill search and discovery | `auxiliary.skills_hub` | | MCP | MCP helper operations | `auxiliary.mcp` | -| Memory Flush | Memory consolidation | `auxiliary.flush_memories` | | Approval | Smart command-approval classification | `auxiliary.approval` | | Title Generation | Session title summaries | `auxiliary.title_generation` | @@ -226,10 +225,6 @@ auxiliary: mcp: provider: "auto" model: "" - - flush_memories: - provider: "auto" - model: "" ``` Every task above follows the same **provider / model / base_url** pattern. Context compression is configured under `auxiliary.compression`: @@ -365,7 +360,6 @@ See [Scheduled Tasks (Cron)](/docs/user-guide/features/cron) for full configurat | Session search | Auto-detection chain | `auxiliary.session_search` | | Skills hub | Auto-detection chain | `auxiliary.skills_hub` | | MCP helpers | Auto-detection chain | `auxiliary.mcp` | -| Memory flush | Auto-detection chain | `auxiliary.flush_memories` | | Approval classification | Auto-detection chain | `auxiliary.approval` | | Title generation | Auto-detection chain | `auxiliary.title_generation` | | Delegation | Provider override only (no automatic fallback) | `delegation.provider` / `delegation.model` |