diff --git a/plugins/hindsight-memory/__init__.py b/plugins/hindsight-memory/__init__.py new file mode 100644 index 00000000000..2d5a058c1fc --- /dev/null +++ b/plugins/hindsight-memory/__init__.py @@ -0,0 +1,304 @@ +"""Hindsight memory plugin — MemoryProvider interface. + +Long-term memory with knowledge graph, entity resolution, and multi-strategy +retrieval. Supports cloud (API key) and local (embedded PostgreSQL) modes. + +Original PR #1811 by benfrank241, adapted to MemoryProvider ABC. + +Config via environment variables: + HINDSIGHT_API_KEY — API key for Hindsight Cloud + HINDSIGHT_BANK_ID — memory bank identifier (default: hermes) + HINDSIGHT_BUDGET — recall budget: low/mid/high (default: mid) + HINDSIGHT_API_URL — API endpoint + HINDSIGHT_MODE — cloud or local (default: cloud) + +Or via ~/.hindsight/config.json (written by the original setup wizard). +""" + +from __future__ import annotations + +import json +import logging +import os +import queue +import threading +from typing import Any, Dict, List + +from agent.memory_provider import MemoryProvider + +logger = logging.getLogger(__name__) + +_DEFAULT_API_URL = "https://api.hindsight.vectorize.io" +_VALID_BUDGETS = {"low", "mid", "high"} + + +# --------------------------------------------------------------------------- +# Thread helper (from original PR — avoids aiohttp event loop conflicts) +# --------------------------------------------------------------------------- + +def _run_in_thread(fn, timeout: float = 30.0): + result_q: queue.Queue = queue.Queue(maxsize=1) + + def _run(): + import asyncio + asyncio.set_event_loop(None) + try: + result_q.put(("ok", fn())) + except Exception as exc: + result_q.put(("err", exc)) + + t = threading.Thread(target=_run, daemon=True, name="hindsight-call") + t.start() + kind, value = result_q.get(timeout=timeout) + if kind == "err": + raise value + return value + + +# --------------------------------------------------------------------------- +# Tool schemas +# --------------------------------------------------------------------------- + +RETAIN_SCHEMA = { + "name": "hindsight_retain", + "description": ( + "Store information to long-term memory. Hindsight automatically " + "extracts structured facts, resolves entities, and indexes for retrieval." + ), + "parameters": { + "type": "object", + "properties": { + "content": {"type": "string", "description": "The information to store."}, + "context": {"type": "string", "description": "Short label (e.g. 'user preference', 'project decision')."}, + }, + "required": ["content"], + }, +} + +RECALL_SCHEMA = { + "name": "hindsight_recall", + "description": ( + "Search long-term memory. Returns memories ranked by relevance using " + "semantic search, keyword matching, entity graph traversal, and reranking." + ), + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "What to search for."}, + }, + "required": ["query"], + }, +} + +REFLECT_SCHEMA = { + "name": "hindsight_reflect", + "description": ( + "Synthesize a reasoned answer from long-term memories. Unlike recall, " + "this reasons across all stored memories to produce a coherent response." + ), + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "The question to reflect on."}, + }, + "required": ["query"], + }, +} + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + +def _load_config() -> dict: + """Load config from ~/.hindsight/config.json, falling back to env vars.""" + from pathlib import Path + config_path = Path.home() / ".hindsight" / "config.json" + + if config_path.exists(): + try: + return json.loads(config_path.read_text(encoding="utf-8")) + except Exception: + pass + + return { + "mode": os.environ.get("HINDSIGHT_MODE", "cloud"), + "apiKey": os.environ.get("HINDSIGHT_API_KEY", ""), + "banks": { + "hermes": { + "bankId": os.environ.get("HINDSIGHT_BANK_ID", "hermes"), + "budget": os.environ.get("HINDSIGHT_BUDGET", "mid"), + "enabled": True, + } + }, + } + + +# --------------------------------------------------------------------------- +# MemoryProvider implementation +# --------------------------------------------------------------------------- + +class HindsightMemoryProvider(MemoryProvider): + """Hindsight long-term memory with knowledge graph and multi-strategy retrieval.""" + + def __init__(self): + self._config = None + self._api_key = None + self._bank_id = "hermes" + self._budget = "mid" + self._mode = "cloud" + self._prefetch_result = "" + self._prefetch_lock = threading.Lock() + self._prefetch_thread = None + + @property + def name(self) -> str: + return "hindsight" + + def is_available(self) -> bool: + try: + cfg = _load_config() + mode = cfg.get("mode", "cloud") + if mode == "local": + embed = cfg.get("embed", {}) + return bool(embed.get("llmApiKey") or os.environ.get("HINDSIGHT_LLM_API_KEY")) + api_key = cfg.get("apiKey") or os.environ.get("HINDSIGHT_API_KEY", "") + return bool(api_key) + except Exception: + return False + + def _make_client(self): + """Create a fresh Hindsight client (thread-safe).""" + if self._mode == "local": + from hindsight import HindsightEmbedded + embed = self._config.get("embed", {}) + return HindsightEmbedded( + profile=embed.get("profile", "hermes"), + llm_provider=embed.get("llmProvider", ""), + llm_api_key=embed.get("llmApiKey", ""), + llm_model=embed.get("llmModel", ""), + ) + from hindsight_client import Hindsight + return Hindsight(api_key=self._api_key, timeout=30.0) + + def initialize(self, session_id: str, **kwargs) -> None: + self._config = _load_config() + self._mode = self._config.get("mode", "cloud") + self._api_key = self._config.get("apiKey") or os.environ.get("HINDSIGHT_API_KEY", "") + + banks = self._config.get("banks", {}).get("hermes", {}) + self._bank_id = banks.get("bankId", "hermes") + budget = banks.get("budget", "mid") + self._budget = budget if budget in _VALID_BUDGETS else "mid" + + # Ensure bank exists + try: + client = _run_in_thread(self._make_client) + _run_in_thread(lambda: client.create_bank(bank_id=self._bank_id, name=self._bank_id)) + except Exception: + pass # Already exists + + def system_prompt_block(self) -> str: + return ( + f"# Hindsight Memory\n" + f"Active. Bank: {self._bank_id}, budget: {self._budget}.\n" + f"Use hindsight_recall to search, hindsight_reflect for synthesis, " + f"hindsight_retain to store facts." + ) + + def prefetch(self, query: str) -> str: + if self._prefetch_thread and self._prefetch_thread.is_alive(): + self._prefetch_thread.join(timeout=3.0) + with self._prefetch_lock: + result = self._prefetch_result + self._prefetch_result = "" + if not result: + return "" + return f"## Hindsight Memory\n{result}" + + def queue_prefetch(self, query: str) -> None: + def _run(): + try: + client = self._make_client() + resp = client.recall(bank_id=self._bank_id, query=query, budget=self._budget) + if resp.results: + text = "\n".join(r.text for r in resp.results if r.text) + with self._prefetch_lock: + self._prefetch_result = text + except Exception as e: + logger.debug("Hindsight prefetch failed: %s", e) + + self._prefetch_thread = threading.Thread(target=_run, daemon=True, name="hindsight-prefetch") + self._prefetch_thread.start() + + def sync_turn(self, user_content: str, assistant_content: str) -> None: + combined = f"User: {user_content}\nAssistant: {assistant_content}" + try: + _run_in_thread( + lambda: self._make_client().retain( + bank_id=self._bank_id, content=combined, context="conversation" + ) + ) + except Exception as e: + logger.warning("Hindsight sync failed: %s", e) + + def get_tool_schemas(self) -> List[Dict[str, Any]]: + return [RETAIN_SCHEMA, RECALL_SCHEMA, REFLECT_SCHEMA] + + def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str: + if tool_name == "hindsight_retain": + content = args.get("content", "") + if not content: + return json.dumps({"error": "Missing required parameter: content"}) + context = args.get("context") + try: + _run_in_thread( + lambda: self._make_client().retain( + bank_id=self._bank_id, content=content, context=context + ) + ) + return json.dumps({"result": "Memory stored successfully."}) + except Exception as e: + return json.dumps({"error": f"Failed to store memory: {e}"}) + + elif tool_name == "hindsight_recall": + query = args.get("query", "") + if not query: + return json.dumps({"error": "Missing required parameter: query"}) + try: + resp = _run_in_thread( + lambda: self._make_client().recall( + bank_id=self._bank_id, query=query, budget=self._budget + ) + ) + if not resp.results: + return json.dumps({"result": "No relevant memories found."}) + lines = [f"{i}. {r.text}" for i, r in enumerate(resp.results, 1)] + return json.dumps({"result": "\n".join(lines)}) + except Exception as e: + return json.dumps({"error": f"Failed to search memory: {e}"}) + + elif tool_name == "hindsight_reflect": + query = args.get("query", "") + if not query: + return json.dumps({"error": "Missing required parameter: query"}) + try: + resp = _run_in_thread( + lambda: self._make_client().reflect( + bank_id=self._bank_id, query=query, budget=self._budget + ) + ) + return json.dumps({"result": resp.text or "No relevant memories found."}) + except Exception as e: + return json.dumps({"error": f"Failed to reflect: {e}"}) + + return json.dumps({"error": f"Unknown tool: {tool_name}"}) + + def shutdown(self) -> None: + if self._prefetch_thread and self._prefetch_thread.is_alive(): + self._prefetch_thread.join(timeout=5.0) + + +def register(ctx) -> None: + """Register Hindsight as a memory provider plugin.""" + ctx.register_memory_provider(HindsightMemoryProvider()) diff --git a/plugins/hindsight-memory/plugin.yaml b/plugins/hindsight-memory/plugin.yaml new file mode 100644 index 00000000000..6d4f12e58e8 --- /dev/null +++ b/plugins/hindsight-memory/plugin.yaml @@ -0,0 +1,8 @@ +name: hindsight-memory +version: 1.0.0 +description: > + Long-term memory via Hindsight — knowledge graph with entity resolution, + multi-strategy retrieval (semantic + BM25 + graph + temporal), and + cross-encoder reranking. Cloud or local mode. +requires_env: + - HINDSIGHT_API_KEY diff --git a/plugins/mem0-memory/__init__.py b/plugins/mem0-memory/__init__.py new file mode 100644 index 00000000000..c21a24e20a4 --- /dev/null +++ b/plugins/mem0-memory/__init__.py @@ -0,0 +1,286 @@ +"""Mem0 memory plugin — MemoryProvider interface. + +Server-side LLM fact extraction, semantic search with reranking, and +automatic deduplication via the Mem0 Platform API. + +Original PR #2933 by kartik-mem0, adapted to MemoryProvider ABC. + +Config via environment variables: + MEM0_API_KEY — Mem0 Platform API key (required) + MEM0_USER_ID — User identifier (default: hermes-user) + MEM0_AGENT_ID — Agent identifier (default: hermes) + +Or via $HERMES_HOME/mem0.json. +""" + +from __future__ import annotations + +import json +import logging +import os +import threading +from pathlib import Path +from typing import Any, Dict, List + +from agent.memory_provider import MemoryProvider + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + +def _load_config() -> dict: + """Load config from $HERMES_HOME/mem0.json or env vars.""" + hermes_home = os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes")) + config_path = Path(hermes_home) / "mem0.json" + + if config_path.exists(): + try: + return json.loads(config_path.read_text(encoding="utf-8")) + except Exception: + pass + + return { + "api_key": os.environ.get("MEM0_API_KEY", ""), + "user_id": os.environ.get("MEM0_USER_ID", "hermes-user"), + "agent_id": os.environ.get("MEM0_AGENT_ID", "hermes"), + "rerank": True, + "keyword_search": False, + } + + +# --------------------------------------------------------------------------- +# Tool schemas +# --------------------------------------------------------------------------- + +PROFILE_SCHEMA = { + "name": "mem0_profile", + "description": ( + "Retrieve all stored memories about the user — preferences, facts, " + "project context. Fast, no reranking. Use at conversation start." + ), + "parameters": {"type": "object", "properties": {}, "required": []}, +} + +SEARCH_SCHEMA = { + "name": "mem0_search", + "description": ( + "Search memories by meaning. Returns relevant facts ranked by similarity. " + "Set rerank=true for higher accuracy (+150ms)." + ), + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "What to search for."}, + "rerank": {"type": "boolean", "description": "Enable reranking for precision (default: false)."}, + "top_k": {"type": "integer", "description": "Max results (default: 10, max: 50)."}, + }, + "required": ["query"], + }, +} + +CONTEXT_SCHEMA = { + "name": "mem0_context", + "description": ( + "Deep retrieval with forced reranking. Use when you need the most " + "relevant memories for a specific topic." + ), + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "What to search for."}, + }, + "required": ["query"], + }, +} + +CONCLUDE_SCHEMA = { + "name": "mem0_conclude", + "description": ( + "Store a durable fact about the user. Stored verbatim (no LLM extraction). " + "Use for explicit preferences, corrections, or decisions." + ), + "parameters": { + "type": "object", + "properties": { + "conclusion": {"type": "string", "description": "The fact to store."}, + }, + "required": ["conclusion"], + }, +} + + +# --------------------------------------------------------------------------- +# MemoryProvider implementation +# --------------------------------------------------------------------------- + +class Mem0MemoryProvider(MemoryProvider): + """Mem0 Platform memory with server-side extraction and semantic search.""" + + def __init__(self): + self._config = None + self._client = None + self._api_key = "" + self._user_id = "hermes-user" + self._agent_id = "hermes" + self._rerank = True + self._prefetch_result = "" + self._prefetch_lock = threading.Lock() + self._prefetch_thread = None + + @property + def name(self) -> str: + return "mem0" + + def is_available(self) -> bool: + cfg = _load_config() + return bool(cfg.get("api_key")) + + def _get_client(self): + if self._client is not None: + return self._client + try: + from mem0 import MemoryClient + self._client = MemoryClient(api_key=self._api_key) + return self._client + except ImportError: + raise RuntimeError("mem0 package not installed. Run: pip install mem0ai") + + def initialize(self, session_id: str, **kwargs) -> None: + self._config = _load_config() + self._api_key = self._config.get("api_key", "") + self._user_id = self._config.get("user_id", "hermes-user") + self._agent_id = self._config.get("agent_id", "hermes") + self._rerank = self._config.get("rerank", True) + + def system_prompt_block(self) -> str: + return ( + "# Mem0 Memory\n" + f"Active. User: {self._user_id}.\n" + "Use mem0_search to find memories, mem0_conclude to store facts, " + "mem0_profile for a full overview." + ) + + def prefetch(self, query: str) -> str: + if self._prefetch_thread and self._prefetch_thread.is_alive(): + self._prefetch_thread.join(timeout=3.0) + with self._prefetch_lock: + result = self._prefetch_result + self._prefetch_result = "" + if not result: + return "" + return f"## Mem0 Memory\n{result}" + + def queue_prefetch(self, query: str) -> None: + def _run(): + try: + client = self._get_client() + results = client.search( + query=query, + user_id=self._user_id, + rerank=self._rerank, + top_k=5, + ) + if results: + lines = [r.get("memory", "") for r in results if r.get("memory")] + with self._prefetch_lock: + self._prefetch_result = "\n".join(f"- {l}" for l in lines) + except Exception as e: + logger.debug("Mem0 prefetch failed: %s", e) + + self._prefetch_thread = threading.Thread(target=_run, daemon=True, name="mem0-prefetch") + self._prefetch_thread.start() + + def sync_turn(self, user_content: str, assistant_content: str) -> None: + """Send the turn to Mem0 for server-side fact extraction.""" + try: + client = self._get_client() + messages = [ + {"role": "user", "content": user_content}, + {"role": "assistant", "content": assistant_content}, + ] + client.add(messages, user_id=self._user_id, agent_id=self._agent_id) + except Exception as e: + logger.warning("Mem0 sync failed: %s", e) + + def get_tool_schemas(self) -> List[Dict[str, Any]]: + return [PROFILE_SCHEMA, SEARCH_SCHEMA, CONTEXT_SCHEMA, CONCLUDE_SCHEMA] + + def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str: + try: + client = self._get_client() + except Exception as e: + return json.dumps({"error": str(e)}) + + if tool_name == "mem0_profile": + try: + memories = client.get_all(user_id=self._user_id) + if not memories: + return json.dumps({"result": "No memories stored yet."}) + lines = [m.get("memory", "") for m in memories if m.get("memory")] + return json.dumps({"result": "\n".join(lines), "count": len(lines)}) + except Exception as e: + return json.dumps({"error": f"Failed to fetch profile: {e}"}) + + elif tool_name == "mem0_search": + query = args.get("query", "") + if not query: + return json.dumps({"error": "Missing required parameter: query"}) + rerank = args.get("rerank", False) + top_k = min(int(args.get("top_k", 10)), 50) + try: + results = client.search( + query=query, user_id=self._user_id, + rerank=rerank, top_k=top_k, + ) + if not results: + return json.dumps({"result": "No relevant memories found."}) + items = [{"memory": r.get("memory", ""), "score": r.get("score", 0)} for r in results] + return json.dumps({"results": items, "count": len(items)}) + except Exception as e: + return json.dumps({"error": f"Search failed: {e}"}) + + elif tool_name == "mem0_context": + query = args.get("query", "") + if not query: + return json.dumps({"error": "Missing required parameter: query"}) + try: + results = client.search( + query=query, user_id=self._user_id, + rerank=True, top_k=5, + ) + if not results: + return json.dumps({"result": "No relevant memories found."}) + items = [{"memory": r.get("memory", ""), "score": r.get("score", 0)} for r in results] + return json.dumps({"results": items, "count": len(items)}) + except Exception as e: + return json.dumps({"error": f"Context retrieval failed: {e}"}) + + elif tool_name == "mem0_conclude": + conclusion = args.get("conclusion", "") + if not conclusion: + return json.dumps({"error": "Missing required parameter: conclusion"}) + try: + client.add( + [{"role": "user", "content": conclusion}], + user_id=self._user_id, + agent_id=self._agent_id, + infer=False, + ) + return json.dumps({"result": "Fact stored."}) + except Exception as e: + return json.dumps({"error": f"Failed to store: {e}"}) + + return json.dumps({"error": f"Unknown tool: {tool_name}"}) + + def shutdown(self) -> None: + if self._prefetch_thread and self._prefetch_thread.is_alive(): + self._prefetch_thread.join(timeout=5.0) + self._client = None + + +def register(ctx) -> None: + """Register Mem0 as a memory provider plugin.""" + ctx.register_memory_provider(Mem0MemoryProvider()) diff --git a/plugins/mem0-memory/plugin.yaml b/plugins/mem0-memory/plugin.yaml new file mode 100644 index 00000000000..2a0ff01100d --- /dev/null +++ b/plugins/mem0-memory/plugin.yaml @@ -0,0 +1,7 @@ +name: mem0-memory +version: 1.0.0 +description: > + Long-term memory via Mem0 Platform — server-side LLM fact extraction, + semantic search with reranking, and automatic deduplication. +requires_env: + - MEM0_API_KEY