fix: recover hindsight embedded daemon after idle shutdown

This commit is contained in:
Wysie
2026-04-26 00:57:24 +08:00
committed by Teknium
parent 7317d69f19
commit 0ba6471dd1
2 changed files with 187 additions and 28 deletions

View File

@@ -3,7 +3,9 @@
Long-term memory with knowledge graph, entity resolution, and multi-strategy
retrieval. Supports cloud (API key) and local modes.
Configurable timeout via HINDSIGHT_TIMEOUT env var or config.json.
Configurable request timeout via HINDSIGHT_TIMEOUT env var or config.json.
Configurable embedded daemon idle timeout via HINDSIGHT_IDLE_TIMEOUT env var
or config.json idle_timeout.
Original PR #1811 by benfrank241, adapted to MemoryProvider ABC.
@@ -14,6 +16,7 @@ Config via environment variables:
HINDSIGHT_API_URL — API endpoint
HINDSIGHT_MODE — cloud or local (default: cloud)
HINDSIGHT_TIMEOUT — API request timeout in seconds (default: 120)
HINDSIGHT_IDLE_TIMEOUT — embedded daemon idle timeout seconds; 0 disables shutdown (default: 300)
HINDSIGHT_RETAIN_TAGS — comma-separated tags attached to retained memories
HINDSIGHT_RETAIN_SOURCE — metadata source value attached to retained memories
HINDSIGHT_RETAIN_USER_PREFIX — label used before user turns in retained transcripts
@@ -45,6 +48,7 @@ _DEFAULT_API_URL = "https://api.hindsight.vectorize.io"
_DEFAULT_LOCAL_URL = "http://localhost:8888"
_MIN_CLIENT_VERSION = "0.4.22"
_DEFAULT_TIMEOUT = 120 # seconds — cloud API can take 30-40s per request
_DEFAULT_IDLE_TIMEOUT = 300 # seconds — Hindsight embedded daemon default
_VALID_BUDGETS = {"low", "mid", "high"}
_PROVIDER_DEFAULT_MODELS = {
"openai": "gpt-4o-mini",
@@ -59,6 +63,17 @@ _PROVIDER_DEFAULT_MODELS = {
}
def _parse_int_setting(value: Any, default: int) -> int:
"""Parse an integer config/env value, falling back on invalid input."""
if value is None or value == "":
return default
try:
return int(value)
except (TypeError, ValueError):
logger.warning("Invalid integer Hindsight setting %r; using default %s", value, default)
return default
def _check_local_runtime() -> tuple[bool, str | None]:
"""Return whether local embedded Hindsight imports cleanly.
@@ -203,6 +218,8 @@ def _load_config() -> dict:
return {
"mode": os.environ.get("HINDSIGHT_MODE", "cloud"),
"apiKey": os.environ.get("HINDSIGHT_API_KEY", ""),
"timeout": _parse_int_setting(os.environ.get("HINDSIGHT_TIMEOUT"), _DEFAULT_TIMEOUT),
"idle_timeout": _parse_int_setting(os.environ.get("HINDSIGHT_IDLE_TIMEOUT"), _DEFAULT_IDLE_TIMEOUT),
"retain_tags": os.environ.get("HINDSIGHT_RETAIN_TAGS", ""),
"retain_source": os.environ.get("HINDSIGHT_RETAIN_SOURCE", ""),
"retain_user_prefix": os.environ.get("HINDSIGHT_RETAIN_USER_PREFIX", "User"),
@@ -304,6 +321,16 @@ def _build_embedded_profile_env(config: dict[str, Any], *, llm_api_key: str | No
}
if current_base_url:
env_values["HINDSIGHT_API_LLM_BASE_URL"] = str(current_base_url)
idle_timeout = (
config.get("idle_timeout")
if config.get("idle_timeout") is not None
else os.environ.get("HINDSIGHT_IDLE_TIMEOUT")
)
if idle_timeout is not None and idle_timeout != "":
env_values["HINDSIGHT_EMBED_DAEMON_IDLE_TIMEOUT"] = str(
_parse_int_setting(idle_timeout, _DEFAULT_IDLE_TIMEOUT)
)
return env_values
@@ -412,6 +439,7 @@ class HindsightMemoryProvider(MemoryProvider):
self._turn_index = 0
self._client = None
self._timeout = _DEFAULT_TIMEOUT
self._idle_timeout = _DEFAULT_IDLE_TIMEOUT
self._prefetch_result = ""
self._prefetch_lock = threading.Lock()
self._prefetch_thread = None
@@ -592,10 +620,17 @@ class HindsightMemoryProvider(MemoryProvider):
sys.stdout.write(" LLM API key: ")
sys.stdout.flush()
llm_key = getpass.getpass(prompt="") if sys.stdin.isatty() else sys.stdin.readline().strip()
# Always write explicitly (including empty) so the provider sees ""
# rather than a missing variable. The daemon reads from .env at
# startup and fails when HINDSIGHT_LLM_API_KEY is unset.
env_writes["HINDSIGHT_LLM_API_KEY"] = llm_key
if llm_key:
env_writes["HINDSIGHT_LLM_API_KEY"] = llm_key
else:
env_path = Path(hermes_home) / ".env"
existing_llm_key = ""
if env_path.exists():
for line in env_path.read_text().splitlines():
if line.startswith("HINDSIGHT_LLM_API_KEY="):
existing_llm_key = line.split("=", 1)[1]
break
env_writes["HINDSIGHT_LLM_API_KEY"] = existing_llm_key
# Step 4: Save everything
provider_config["bank_id"] = "hermes"
@@ -605,6 +640,11 @@ class HindsightMemoryProvider(MemoryProvider):
timeout_val = existing_timeout if existing_timeout else _DEFAULT_TIMEOUT
provider_config["timeout"] = timeout_val
env_writes["HINDSIGHT_TIMEOUT"] = str(timeout_val)
if mode == "local_embedded":
existing_idle_timeout = self._config.get("idle_timeout") if self._config else None
idle_timeout_val = existing_idle_timeout if existing_idle_timeout is not None else _DEFAULT_IDLE_TIMEOUT
provider_config["idle_timeout"] = idle_timeout_val
env_writes["HINDSIGHT_IDLE_TIMEOUT"] = str(idle_timeout_val)
config["memory"]["provider"] = "hindsight"
save_config(config)
@@ -693,6 +733,7 @@ class HindsightMemoryProvider(MemoryProvider):
{"key": "recall_max_input_chars", "description": "Maximum input query length for auto-recall", "default": 800},
{"key": "recall_prompt_preamble", "description": "Custom preamble for recalled memories in context"},
{"key": "timeout", "description": "API request timeout in seconds", "default": _DEFAULT_TIMEOUT},
{"key": "idle_timeout", "description": "Embedded daemon idle timeout in seconds (0 disables auto-shutdown)", "default": _DEFAULT_IDLE_TIMEOUT, "when": {"mode": "local_embedded"}},
]
def _get_client(self):
@@ -720,6 +761,14 @@ class HindsightMemoryProvider(MemoryProvider):
)
if self._llm_base_url:
kwargs["llm_base_url"] = self._llm_base_url
idle_timeout = _parse_int_setting(
self._config.get("idle_timeout")
if self._config.get("idle_timeout") is not None
else os.environ.get("HINDSIGHT_IDLE_TIMEOUT", self._idle_timeout),
_DEFAULT_IDLE_TIMEOUT,
)
self._idle_timeout = idle_timeout
kwargs["idle_timeout"] = idle_timeout
self._client = HindsightEmbedded(**kwargs)
else:
from hindsight_client import Hindsight
@@ -736,6 +785,38 @@ class HindsightMemoryProvider(MemoryProvider):
"""Schedule *coro* on the shared loop using the configured timeout."""
return _run_sync(coro, timeout=self._timeout)
def _is_retriable_embedded_connection_error(self, exc: Exception) -> bool:
"""Return True for stale embedded-daemon connection failures."""
if self._mode != "local_embedded":
return False
text = f"{type(exc).__name__}: {exc}".lower()
return any(
marker in text
for marker in (
"cannot connect to host",
"connection refused",
"connect call failed",
"clientconnectorerror",
)
)
def _run_hindsight_operation(self, operation):
"""Run an async Hindsight client operation, retrying once after idle shutdown."""
client = self._get_client()
try:
return self._run_sync(operation(client))
except Exception as exc:
if not self._is_retriable_embedded_connection_error(exc):
raise
logger.info(
"Hindsight embedded daemon appears unreachable; recreating client and retrying once: %s",
exc,
)
self._client = None
client = self._get_client()
self._client = client
return self._run_sync(operation(client))
def initialize(self, session_id: str, **kwargs) -> None:
self._session_id = str(session_id or "").strip()
self._parent_session_id = str(kwargs.get("parent_session_id", "") or "").strip()
@@ -790,7 +871,14 @@ class HindsightMemoryProvider(MemoryProvider):
self._session_turns = []
self._mode = self._config.get("mode", "cloud")
# Read timeout from config or env var, fall back to default
self._timeout = self._config.get("timeout") or int(os.environ.get("HINDSIGHT_TIMEOUT", str(_DEFAULT_TIMEOUT)))
self._timeout = _parse_int_setting(
self._config.get("timeout") if self._config.get("timeout") is not None else os.environ.get("HINDSIGHT_TIMEOUT"),
_DEFAULT_TIMEOUT,
)
self._idle_timeout = _parse_int_setting(
self._config.get("idle_timeout") if self._config.get("idle_timeout") is not None else os.environ.get("HINDSIGHT_IDLE_TIMEOUT"),
_DEFAULT_IDLE_TIMEOUT,
)
# "local" is a legacy alias for "local_embedded"
if self._mode == "local":
self._mode = "local_embedded"
@@ -981,10 +1069,9 @@ class HindsightMemoryProvider(MemoryProvider):
def _run():
try:
client = self._get_client()
if self._prefetch_method == "reflect":
logger.debug("Prefetch: calling reflect (bank=%s, query_len=%d)", self._bank_id, len(query))
resp = self._run_sync(client.areflect(bank_id=self._bank_id, query=query, budget=self._budget))
resp = self._run_hindsight_operation(lambda client: client.areflect(bank_id=self._bank_id, query=query, budget=self._budget))
text = resp.text or ""
else:
recall_kwargs: dict = {
@@ -998,7 +1085,7 @@ class HindsightMemoryProvider(MemoryProvider):
recall_kwargs["types"] = self._recall_types
logger.debug("Prefetch: calling recall (bank=%s, query_len=%d, budget=%s)",
self._bank_id, len(query), self._budget)
resp = self._run_sync(client.arecall(**recall_kwargs))
resp = self._run_hindsight_operation(lambda client: client.arecall(**recall_kwargs))
num_results = len(resp.results) if resp.results else 0
logger.debug("Prefetch: recall returned %d results", num_results)
text = "\n".join(f"- {r.text}" for r in resp.results if r.text) if resp.results else ""
@@ -1131,12 +1218,14 @@ class HindsightMemoryProvider(MemoryProvider):
item.pop("retain_async", None)
logger.debug("Hindsight retain: bank=%s, doc=%s, async=%s, content_len=%d, num_turns=%d",
self._bank_id, self._document_id, self._retain_async, len(content), len(self._session_turns))
self._run_sync(client.aretain_batch(
bank_id=self._bank_id,
items=[item],
document_id=self._document_id,
retain_async=self._retain_async,
))
self._run_hindsight_operation(
lambda client: client.aretain_batch(
bank_id=self._bank_id,
items=[item],
document_id=self._document_id,
retain_async=self._retain_async,
)
)
logger.debug("Hindsight retain succeeded")
except Exception as e:
logger.warning("Hindsight sync failed: %s", e, exc_info=True)
@@ -1152,12 +1241,6 @@ class HindsightMemoryProvider(MemoryProvider):
return [RETAIN_SCHEMA, RECALL_SCHEMA, REFLECT_SCHEMA]
def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str:
try:
client = self._get_client()
except Exception as e:
logger.warning("Hindsight client init failed: %s", e)
return tool_error(f"Hindsight client unavailable: {e}")
if tool_name == "hindsight_retain":
content = args.get("content", "")
if not content:
@@ -1171,7 +1254,7 @@ class HindsightMemoryProvider(MemoryProvider):
)
logger.debug("Tool hindsight_retain: bank=%s, content_len=%d, context=%s",
self._bank_id, len(content), context)
self._run_sync(client.aretain(**retain_kwargs))
self._run_hindsight_operation(lambda client: client.aretain(**retain_kwargs))
logger.debug("Tool hindsight_retain: success")
return json.dumps({"result": "Memory stored successfully."})
except Exception as e:
@@ -1194,7 +1277,7 @@ class HindsightMemoryProvider(MemoryProvider):
recall_kwargs["types"] = self._recall_types
logger.debug("Tool hindsight_recall: bank=%s, query_len=%d, budget=%s",
self._bank_id, len(query), self._budget)
resp = self._run_sync(client.arecall(**recall_kwargs))
resp = self._run_hindsight_operation(lambda client: client.arecall(**recall_kwargs))
num_results = len(resp.results) if resp.results else 0
logger.debug("Tool hindsight_recall: %d results", num_results)
if not resp.results:
@@ -1212,9 +1295,11 @@ class HindsightMemoryProvider(MemoryProvider):
try:
logger.debug("Tool hindsight_reflect: bank=%s, query_len=%d, budget=%s",
self._bank_id, len(query), self._budget)
resp = self._run_sync(client.areflect(
bank_id=self._bank_id, query=query, budget=self._budget
))
resp = self._run_hindsight_operation(
lambda client: client.areflect(
bank_id=self._bank_id, query=query, budget=self._budget
)
)
logger.debug("Tool hindsight_reflect: response_len=%d", len(resp.text or ""))
return json.dumps({"result": resp.text or "No relevant memories found."})
except Exception as e:

View File

@@ -7,6 +7,7 @@ turn counting, tags), and schema completeness.
import json
import re
import sys
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
@@ -18,6 +19,7 @@ from plugins.memory.hindsight import (
REFLECT_SCHEMA,
RETAIN_SCHEMA,
_load_config,
_build_embedded_profile_env,
_normalize_retain_tags,
_resolve_bank_id_template,
_sanitize_bank_segment,
@@ -34,7 +36,8 @@ def _clean_env(monkeypatch):
"""Ensure no stale env vars leak between tests."""
for key in (
"HINDSIGHT_API_KEY", "HINDSIGHT_API_URL", "HINDSIGHT_BANK_ID",
"HINDSIGHT_BUDGET", "HINDSIGHT_MODE", "HINDSIGHT_LLM_API_KEY",
"HINDSIGHT_BUDGET", "HINDSIGHT_MODE", "HINDSIGHT_TIMEOUT",
"HINDSIGHT_IDLE_TIMEOUT", "HINDSIGHT_LLM_API_KEY",
"HINDSIGHT_RETAIN_TAGS", "HINDSIGHT_RETAIN_SOURCE",
"HINDSIGHT_RETAIN_USER_PREFIX", "HINDSIGHT_RETAIN_ASSISTANT_PREFIX",
):
@@ -251,6 +254,51 @@ class TestConfig:
assert cfg["banks"]["hermes"]["bankId"] == "env-bank"
assert cfg["banks"]["hermes"]["budget"] == "high"
def test_embedded_profile_env_includes_idle_timeout_from_config(self):
env = _build_embedded_profile_env({
"llm_provider": "openai",
"llm_model": "gpt-4o-mini",
"idle_timeout": 0,
})
assert env["HINDSIGHT_EMBED_DAEMON_IDLE_TIMEOUT"] == "0"
def test_embedded_profile_env_includes_idle_timeout_from_env(self, monkeypatch):
monkeypatch.setenv("HINDSIGHT_IDLE_TIMEOUT", "42")
env = _build_embedded_profile_env({
"llm_provider": "openai",
"llm_model": "gpt-4o-mini",
})
assert env["HINDSIGHT_EMBED_DAEMON_IDLE_TIMEOUT"] == "42"
def test_get_client_passes_idle_timeout_to_hindsight_embedded(self, monkeypatch):
captured = {}
class FakeHindsightEmbedded:
def __init__(self, **kwargs):
captured.update(kwargs)
monkeypatch.setitem(sys.modules, "hindsight", SimpleNamespace(HindsightEmbedded=FakeHindsightEmbedded))
monkeypatch.setattr("plugins.memory.hindsight._check_local_runtime", lambda: (True, ""))
p = HindsightMemoryProvider()
p._mode = "local_embedded"
p._config = {
"profile": "hermes",
"llm_provider": "openai_compatible",
"llm_api_key": "test-key",
"llm_model": "test-model",
"idle_timeout": 0,
}
p._llm_base_url = "http://localhost:8060/v1"
p._get_client()
assert captured["idle_timeout"] == 0
assert captured["llm_provider"] == "openai"
class TestPostSetup:
def test_local_embedded_setup_materializes_profile_env(self, tmp_path, monkeypatch):
@@ -272,7 +320,10 @@ class TestPostSetup:
provider.post_setup(str(hermes_home), {"memory": {}})
assert saved_configs[-1]["memory"]["provider"] == "hindsight"
assert (hermes_home / ".env").read_text() == "HINDSIGHT_LLM_API_KEY=sk-local-test\nHINDSIGHT_TIMEOUT=120\n"
env_text = (hermes_home / ".env").read_text()
assert "HINDSIGHT_LLM_API_KEY=sk-local-test\n" in env_text
assert "HINDSIGHT_TIMEOUT=120\n" in env_text
assert "HINDSIGHT_IDLE_TIMEOUT=300\n" in env_text
profile_env = user_home / ".hindsight" / "profiles" / "hermes.env"
assert profile_env.exists()
@@ -281,6 +332,7 @@ class TestPostSetup:
"HINDSIGHT_API_LLM_API_KEY=sk-local-test\n"
"HINDSIGHT_API_LLM_MODEL=gpt-4o-mini\n"
"HINDSIGHT_API_LOG_LEVEL=info\n"
"HINDSIGHT_EMBED_DAEMON_IDLE_TIMEOUT=300\n"
)
def test_local_embedded_setup_respects_existing_profile_name(self, tmp_path, monkeypatch):
@@ -446,6 +498,28 @@ class TestToolHandlers:
))
assert "error" in result
def test_local_embedded_recall_reconnects_after_idle_shutdown(self, provider, monkeypatch):
first_client = _make_mock_client()
first_client.arecall.side_effect = RuntimeError("Cannot connect to host 127.0.0.1:8888")
second_client = _make_mock_client()
second_client.arecall.return_value = SimpleNamespace(
results=[SimpleNamespace(text="Recovered memory")]
)
clients = iter([first_client, second_client])
provider._mode = "local_embedded"
provider._client = first_client
monkeypatch.setattr(provider, "_get_client", lambda: next(clients))
result = json.loads(provider.handle_tool_call(
"hindsight_recall", {"query": "test"}
))
assert result["result"] == "1. Recovered memory"
assert provider._client is second_client
first_client.arecall.assert_called_once()
second_client.arecall.assert_called_once()
# ---------------------------------------------------------------------------
# Prefetch tests