diff --git a/plugins/cognitive-memory/__init__.py b/plugins/cognitive-memory/__init__.py new file mode 100644 index 00000000000..28f6bc39893 --- /dev/null +++ b/plugins/cognitive-memory/__init__.py @@ -0,0 +1,378 @@ +"""Cognitive memory plugin — MemoryProvider interface. + +Semantic memory with vector embeddings (via litellm), auto-classification, +contradiction detection, importance decay, and time-based forgetting. +Local SQLite storage with binary-packed float32 embeddings. + +Original PR #727 by 0xbyt4, adapted to MemoryProvider ABC. + +Requires: litellm (for embeddings via any provider — OpenAI, Cohere, etc.) +Config via environment: uses litellm's standard env vars (OPENAI_API_KEY, etc.) +""" + +from __future__ import annotations + +import json +import logging +import math +import os +import re +import sqlite3 +import struct +import time +from pathlib import Path +from typing import Any, Dict, List, Optional + +from agent.memory_provider import MemoryProvider + +logger = logging.getLogger(__name__) + +_DB_DIR = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))) / "cognitive_memory" +_EMBEDDING_DIM = 1536 # text-embedding-3-small default +_SIMILARITY_DEDUP_THRESHOLD = 0.95 + + +# --------------------------------------------------------------------------- +# Embedding helper +# --------------------------------------------------------------------------- + +def _get_embedding(text: str) -> Optional[List[float]]: + """Get embedding via litellm.""" + try: + import litellm + resp = litellm.embedding(model="text-embedding-3-small", input=[text]) + return resp.data[0]["embedding"] + except Exception as e: + logger.debug("Embedding failed: %s", e) + return None + + +def _cosine_similarity(a: List[float], b: List[float]) -> float: + dot = sum(x * y for x, y in zip(a, b)) + mag_a = math.sqrt(sum(x * x for x in a)) + mag_b = math.sqrt(sum(x * x for x in b)) + if mag_a == 0 or mag_b == 0: + return 0.0 + return dot / (mag_a * mag_b) + + +def _pack_embedding(emb: List[float]) -> bytes: + return struct.pack(f"{len(emb)}f", *emb) + + +def _unpack_embedding(data: bytes) -> List[float]: + n = len(data) // 4 + return list(struct.unpack(f"{n}f", data)) + + +# --------------------------------------------------------------------------- +# Classification +# --------------------------------------------------------------------------- + +_CATEGORY_PATTERNS = { + "preference": [r"\b(?:prefer|like|love|hate|dislike|favorite)\b"], + "correction": [r"\b(?:actually|no,|wrong|incorrect|not right)\b"], + "fact": [r"\b(?:is|are|was|were|has|have)\b"], + "procedure": [r"\b(?:first|then|step|always|never|usually)\b"], + "environment": [r"\b(?:running|using|installed|version|os|platform)\b"], +} + + +def _classify(content: str) -> str: + content_lower = content.lower() + for category, patterns in _CATEGORY_PATTERNS.items(): + for pattern in patterns: + if re.search(pattern, content_lower): + return category + return "general" + + +def _estimate_importance(content: str, category: str) -> float: + base = {"correction": 0.9, "preference": 0.7, "procedure": 0.6}.get(category, 0.5) + # Longer content slightly more important + length_bonus = min(len(content) / 500, 0.2) + return min(base + length_bonus, 1.0) + + +# --------------------------------------------------------------------------- +# Tool schema +# --------------------------------------------------------------------------- + +COGNITIVE_RECALL_SCHEMA = { + "name": "cognitive_recall", + "description": ( + "Semantic memory with automatic classification and importance scoring. " + "Actions: recall (search by meaning), store (add a fact), " + "forget (remove by ID), status (memory stats)." + ), + "parameters": { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["recall", "store", "forget", "status"], + "description": "Action to perform.", + }, + "query": {"type": "string", "description": "Search query (for 'recall')."}, + "content": {"type": "string", "description": "Fact to store (for 'store')."}, + "category": { + "type": "string", + "enum": ["preference", "fact", "procedure", "environment", "correction", "general"], + "description": "Category (auto-detected if omitted).", + }, + "memory_id": {"type": "integer", "description": "Memory ID (for 'forget')."}, + }, + "required": ["action"], + }, +} + + +# --------------------------------------------------------------------------- +# MemoryProvider implementation +# --------------------------------------------------------------------------- + +class CognitiveMemoryProvider(MemoryProvider): + """Semantic memory with embeddings, classification, and forgetting.""" + + def __init__(self): + self._conn = None + self._decay_half_life = 30 # days + self._last_decay = 0 + + @property + def name(self) -> str: + return "cognitive" + + def is_available(self) -> bool: + try: + import litellm # noqa: F401 + return True + except ImportError: + return False + + def initialize(self, session_id: str, **kwargs) -> None: + _DB_DIR.mkdir(parents=True, exist_ok=True) + db_path = _DB_DIR / "cognitive.db" + self._conn = sqlite3.connect(str(db_path)) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.executescript(""" + CREATE TABLE IF NOT EXISTS memories ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT NOT NULL, + category TEXT DEFAULT 'general', + importance REAL DEFAULT 0.5, + embedding BLOB, + retrieval_count INTEGER DEFAULT 0, + helpful_count INTEGER DEFAULT 0, + created_at REAL, + updated_at REAL, + deleted INTEGER DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS idx_mem_importance ON memories(importance DESC); + CREATE INDEX IF NOT EXISTS idx_mem_category ON memories(category); + """) + self._conn.commit() + + def system_prompt_block(self) -> str: + if not self._conn: + return "" + try: + count = self._conn.execute( + "SELECT COUNT(*) FROM memories WHERE deleted = 0" + ).fetchone()[0] + except Exception: + count = 0 + if count == 0: + return "" + return ( + f"# Cognitive Memory\n" + f"Active. {count} memories with semantic recall and importance scoring.\n" + f"Use cognitive_recall to search, store facts, or check status.\n" + f"Memories decay over time — frequently used facts persist, unused ones fade." + ) + + def prefetch(self, query: str) -> str: + if not self._conn or not query: + return "" + emb = _get_embedding(query) + if not emb: + return "" + try: + rows = self._conn.execute( + "SELECT id, content, importance, embedding FROM memories " + "WHERE deleted = 0 AND embedding IS NOT NULL " + "ORDER BY importance DESC LIMIT 50" + ).fetchall() + scored = [] + now = time.time() + for row in rows: + mem_emb = _unpack_embedding(row[3]) + sim = _cosine_similarity(emb, mem_emb) + importance = row[2] + score = 0.5 * sim + 0.3 * importance + 0.2 * max(0, 1 - (now - (row[0] * 86400)) / (30 * 86400)) + if sim > 0.3: + scored.append((score, row[1])) + scored.sort(reverse=True) + if not scored: + return "" + lines = [f"- {content}" for _, content in scored[:5]] + return "## Cognitive Memory\n" + "\n".join(lines) + except Exception as e: + logger.debug("Cognitive prefetch failed: %s", e) + return "" + + def sync_turn(self, user_content: str, assistant_content: str) -> None: + # Run decay cycle periodically + self._maybe_decay() + + def get_tool_schemas(self) -> List[Dict[str, Any]]: + return [COGNITIVE_RECALL_SCHEMA] + + def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str: + if tool_name != "cognitive_recall": + return json.dumps({"error": f"Unknown tool: {tool_name}"}) + + action = args.get("action", "") + + if action == "store": + return self._store(args) + elif action == "recall": + return self._recall(args) + elif action == "forget": + return self._forget(args) + elif action == "status": + return self._status() + return json.dumps({"error": f"Unknown action: {action}"}) + + def on_memory_write(self, action: str, target: str, content: str) -> None: + if action == "add" and self._conn and content: + category = "preference" if target == "user" else _classify(content) + importance = _estimate_importance(content, category) + emb = _get_embedding(content) + now = time.time() + self._conn.execute( + "INSERT INTO memories (content, category, importance, embedding, created_at, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (content, category, importance, _pack_embedding(emb) if emb else None, now, now), + ) + self._conn.commit() + + def shutdown(self) -> None: + if self._conn: + self._conn.close() + self._conn = None + + # -- Internal methods ---------------------------------------------------- + + def _store(self, args: dict) -> str: + content = args.get("content", "") + if not content: + return json.dumps({"error": "content is required"}) + + category = args.get("category") or _classify(content) + importance = _estimate_importance(content, category) + emb = _get_embedding(content) + + # Dedup check + if emb: + rows = self._conn.execute( + "SELECT id, embedding FROM memories WHERE deleted = 0 AND embedding IS NOT NULL" + ).fetchall() + for row in rows: + existing_emb = _unpack_embedding(row[1]) + if _cosine_similarity(emb, existing_emb) > _SIMILARITY_DEDUP_THRESHOLD: + return json.dumps({"error": "Very similar memory already exists", "existing_id": row[0]}) + + now = time.time() + cur = self._conn.execute( + "INSERT INTO memories (content, category, importance, embedding, created_at, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (content, category, importance, _pack_embedding(emb) if emb else None, now, now), + ) + self._conn.commit() + return json.dumps({"id": cur.lastrowid, "category": category, "importance": round(importance, 2)}) + + def _recall(self, args: dict) -> str: + query = args.get("query", "") + if not query: + return json.dumps({"error": "query is required"}) + + emb = _get_embedding(query) + if not emb: + return json.dumps({"error": "Embedding generation failed"}) + + rows = self._conn.execute( + "SELECT id, content, category, importance, embedding, created_at FROM memories " + "WHERE deleted = 0 AND embedding IS NOT NULL " + "ORDER BY importance DESC LIMIT 50" + ).fetchall() + + now = time.time() + results = [] + for row in rows: + mem_emb = _unpack_embedding(row[4]) + sim = _cosine_similarity(emb, mem_emb) + days_old = (now - (row[5] or now)) / 86400 + recency = max(0, 1 - days_old / 90) + score = 0.5 * sim + 0.3 * row[3] + 0.2 * recency + if sim > 0.2: + results.append({ + "id": row[0], "content": row[1], "category": row[2], + "score": round(score, 3), "similarity": round(sim, 3), + }) + + results.sort(key=lambda x: x["score"], reverse=True) + # Bump retrieval counts + for r in results[:10]: + self._conn.execute( + "UPDATE memories SET retrieval_count = retrieval_count + 1 WHERE id = ?", + (r["id"],), + ) + self._conn.commit() + return json.dumps({"results": results[:10], "count": len(results[:10])}) + + def _forget(self, args: dict) -> str: + memory_id = args.get("memory_id") + if memory_id is None: + return json.dumps({"error": "memory_id is required"}) + self._conn.execute("UPDATE memories SET deleted = 1 WHERE id = ?", (int(memory_id),)) + self._conn.commit() + return json.dumps({"forgotten": True, "id": memory_id}) + + def _status(self) -> str: + total = self._conn.execute("SELECT COUNT(*) FROM memories WHERE deleted = 0").fetchone()[0] + by_cat = self._conn.execute( + "SELECT category, COUNT(*) FROM memories WHERE deleted = 0 GROUP BY category" + ).fetchall() + return json.dumps({ + "total": total, + "by_category": {row[0]: row[1] for row in by_cat}, + "decay_half_life_days": self._decay_half_life, + }) + + def _maybe_decay(self) -> None: + """Run importance decay every ~1 hour.""" + now = time.time() + if now - self._last_decay < 3600: + return + self._last_decay = now + if not self._conn or self._decay_half_life <= 0: + return + try: + factor = 0.5 ** (1.0 / self._decay_half_life) + self._conn.execute( + "UPDATE memories SET importance = importance * ? WHERE deleted = 0", + (factor,), + ) + # Prune very low importance + self._conn.execute( + "UPDATE memories SET deleted = 1 WHERE deleted = 0 AND importance < 0.05" + ) + self._conn.commit() + except Exception as e: + logger.debug("Cognitive decay failed: %s", e) + + +def register(ctx) -> None: + """Register cognitive memory as a memory provider plugin.""" + ctx.register_memory_provider(CognitiveMemoryProvider()) diff --git a/plugins/cognitive-memory/plugin.yaml b/plugins/cognitive-memory/plugin.yaml new file mode 100644 index 00000000000..a48138c7467 --- /dev/null +++ b/plugins/cognitive-memory/plugin.yaml @@ -0,0 +1,6 @@ +name: cognitive-memory +version: 1.0.0 +description: > + Semantic memory with vector embeddings, auto-classification, contradiction + detection, importance decay, and time-based forgetting. Local SQLite storage, + requires litellm for embeddings. diff --git a/plugins/openviking-memory/__init__.py b/plugins/openviking-memory/__init__.py new file mode 100644 index 00000000000..b1e6f712084 --- /dev/null +++ b/plugins/openviking-memory/__init__.py @@ -0,0 +1,199 @@ +"""OpenViking memory plugin — MemoryProvider interface. + +Read-only semantic search over a self-hosted OpenViking knowledge server. +Supports search (fast/deep/auto), URI-based content reading, and +filesystem-style browsing. + +Original PR #3369 by Mibayy, adapted to MemoryProvider ABC. + +Config via environment variables: + OPENVIKING_ENDPOINT — Server URL (default: http://127.0.0.1:1933) + OPENVIKING_API_KEY — Optional API key +""" + +from __future__ import annotations + +import json +import logging +import os +from typing import Any, Dict, List + +from agent.memory_provider import MemoryProvider + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Tool schemas +# --------------------------------------------------------------------------- + +SEARCH_SCHEMA = { + "name": "viking_search", + "description": ( + "Semantic search over OpenViking knowledge base. " + "Returns ranked results with URIs for deeper reading." + ), + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search query."}, + "mode": { + "type": "string", "enum": ["auto", "fast", "deep"], + "description": "Search depth (default: auto).", + }, + "scope": {"type": "string", "description": "URI prefix to scope search."}, + "limit": {"type": "integer", "description": "Max results (default: 10)."}, + }, + "required": ["query"], + }, +} + +READ_SCHEMA = { + "name": "viking_read", + "description": ( + "Read content at a viking:// URI. Supports three detail levels: " + "abstract (summary), overview (key points), read (full content)." + ), + "parameters": { + "type": "object", + "properties": { + "uri": {"type": "string", "description": "viking:// URI to read."}, + "level": { + "type": "string", "enum": ["abstract", "overview", "read"], + "description": "Detail level (default: overview).", + }, + }, + "required": ["uri"], + }, +} + +BROWSE_SCHEMA = { + "name": "viking_browse", + "description": ( + "Browse the OpenViking knowledge store like a filesystem. " + "Supports tree (hierarchy), list (directory), and stat (metadata)." + ), + "parameters": { + "type": "object", + "properties": { + "action": { + "type": "string", "enum": ["tree", "list", "stat"], + "description": "Browse action.", + }, + "path": {"type": "string", "description": "Path to browse (default: root)."}, + }, + "required": ["action"], + }, +} + + +# --------------------------------------------------------------------------- +# MemoryProvider implementation +# --------------------------------------------------------------------------- + +class OpenVikingMemoryProvider(MemoryProvider): + """Read-only memory via OpenViking self-hosted knowledge server.""" + + def __init__(self): + self._endpoint = "" + self._api_key = "" + + @property + def name(self) -> str: + return "openviking" + + def is_available(self) -> bool: + endpoint = os.environ.get("OPENVIKING_ENDPOINT", "") + if not endpoint: + return False + # Quick health check + try: + import httpx + resp = httpx.get(f"{endpoint}/health", timeout=3.0) + return resp.status_code == 200 + except Exception: + return False + + def initialize(self, session_id: str, **kwargs) -> None: + self._endpoint = os.environ.get("OPENVIKING_ENDPOINT", "http://127.0.0.1:1933") + self._api_key = os.environ.get("OPENVIKING_API_KEY", "") + + def _headers(self) -> dict: + h = {"Content-Type": "application/json"} + if self._api_key: + h["X-API-Key"] = self._api_key + return h + + def system_prompt_block(self) -> str: + return ( + "# OpenViking Knowledge Base\n" + f"Active. Endpoint: {self._endpoint}\n" + "Use viking_search to find information, viking_read for details, " + "viking_browse to explore the knowledge tree." + ) + + def prefetch(self, query: str) -> str: + """OpenViking is tool-driven, no automatic prefetch.""" + return "" + + def get_tool_schemas(self) -> List[Dict[str, Any]]: + return [SEARCH_SCHEMA, READ_SCHEMA, BROWSE_SCHEMA] + + def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str: + try: + import httpx + except ImportError: + return json.dumps({"error": "httpx not installed"}) + + try: + if tool_name == "viking_search": + return self._search(httpx, args) + elif tool_name == "viking_read": + return self._read(httpx, args) + elif tool_name == "viking_browse": + return self._browse(httpx, args) + return json.dumps({"error": f"Unknown tool: {tool_name}"}) + except Exception as e: + return json.dumps({"error": str(e)}) + + def _search(self, httpx, args: dict) -> str: + query = args.get("query", "") + if not query: + return json.dumps({"error": "query is required"}) + payload = {"query": query, "mode": args.get("mode", "auto")} + if args.get("scope"): + payload["scope"] = args["scope"] + if args.get("limit"): + payload["limit"] = args["limit"] + resp = httpx.post( + f"{self._endpoint}/v1/search", + json=payload, headers=self._headers(), timeout=30.0, + ) + return resp.text + + def _read(self, httpx, args: dict) -> str: + uri = args.get("uri", "") + if not uri: + return json.dumps({"error": "uri is required"}) + level = args.get("level", "overview") + resp = httpx.post( + f"{self._endpoint}/v1/read", + json={"uri": uri, "level": level}, + headers=self._headers(), timeout=30.0, + ) + return resp.text + + def _browse(self, httpx, args: dict) -> str: + action = args.get("action", "tree") + path = args.get("path", "/") + resp = httpx.post( + f"{self._endpoint}/v1/browse", + json={"action": action, "path": path}, + headers=self._headers(), timeout=30.0, + ) + return resp.text + + +def register(ctx) -> None: + """Register OpenViking as a memory provider plugin.""" + ctx.register_memory_provider(OpenVikingMemoryProvider()) diff --git a/plugins/openviking-memory/plugin.yaml b/plugins/openviking-memory/plugin.yaml new file mode 100644 index 00000000000..09cc3a9ed44 --- /dev/null +++ b/plugins/openviking-memory/plugin.yaml @@ -0,0 +1,7 @@ +name: openviking-memory +version: 1.0.0 +description: > + Read-only memory via OpenViking — semantic search, URI-based content + reading, and filesystem browsing over a self-hosted knowledge server. +requires_env: + - OPENVIKING_ENDPOINT diff --git a/plugins/retaindb-memory/__init__.py b/plugins/retaindb-memory/__init__.py new file mode 100644 index 00000000000..1e591ff54b2 --- /dev/null +++ b/plugins/retaindb-memory/__init__.py @@ -0,0 +1,273 @@ +"""RetainDB memory plugin — MemoryProvider interface. + +Cross-session memory via RetainDB cloud API. Durable write-behind queue, +semantic search with deduplication, and user profile retrieval. + +Original PR #2732 by Alinxus, adapted to MemoryProvider ABC. + +Config via environment variables: + RETAINDB_API_KEY — API key (required) + RETAINDB_BASE_URL — API endpoint (default: https://api.retaindb.com) + RETAINDB_PROJECT — Project identifier (default: hermes) +""" + +from __future__ import annotations + +import json +import logging +import os +import threading +from typing import Any, Dict, List + +from agent.memory_provider import MemoryProvider + +logger = logging.getLogger(__name__) + +_DEFAULT_BASE_URL = "https://api.retaindb.com" + + +# --------------------------------------------------------------------------- +# Tool schemas +# --------------------------------------------------------------------------- + +PROFILE_SCHEMA = { + "name": "retaindb_profile", + "description": "Get the user's stable profile — preferences, facts, and patterns.", + "parameters": {"type": "object", "properties": {}, "required": []}, +} + +SEARCH_SCHEMA = { + "name": "retaindb_search", + "description": ( + "Semantic search across stored memories. Returns ranked results " + "with relevance scores." + ), + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "What to search for."}, + "top_k": {"type": "integer", "description": "Max results (default: 8, max: 20)."}, + }, + "required": ["query"], + }, +} + +CONTEXT_SCHEMA = { + "name": "retaindb_context", + "description": "Synthesized 'what matters now' context block for the current task.", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Current task or question."}, + }, + "required": ["query"], + }, +} + +REMEMBER_SCHEMA = { + "name": "retaindb_remember", + "description": "Persist an explicit fact or preference to long-term memory.", + "parameters": { + "type": "object", + "properties": { + "content": {"type": "string", "description": "The fact to remember."}, + "memory_type": { + "type": "string", + "enum": ["preference", "fact", "decision", "context"], + "description": "Category (default: fact).", + }, + "importance": { + "type": "number", + "description": "Importance 0-1 (default: 0.5).", + }, + }, + "required": ["content"], + }, +} + +FORGET_SCHEMA = { + "name": "retaindb_forget", + "description": "Delete a specific memory by ID.", + "parameters": { + "type": "object", + "properties": { + "memory_id": {"type": "string", "description": "Memory ID to delete."}, + }, + "required": ["memory_id"], + }, +} + + +# --------------------------------------------------------------------------- +# MemoryProvider implementation +# --------------------------------------------------------------------------- + +class RetainDBMemoryProvider(MemoryProvider): + """RetainDB cloud memory with write-behind queue and semantic search.""" + + def __init__(self): + self._api_key = "" + self._base_url = _DEFAULT_BASE_URL + self._project = "hermes" + self._user_id = "" + self._prefetch_result = "" + self._prefetch_lock = threading.Lock() + self._prefetch_thread = None + + @property + def name(self) -> str: + return "retaindb" + + def is_available(self) -> bool: + return bool(os.environ.get("RETAINDB_API_KEY")) + + def _headers(self) -> dict: + return { + "Authorization": f"Bearer {self._api_key}", + "Content-Type": "application/json", + } + + def _api(self, method: str, path: str, **kwargs): + """Make an API call to RetainDB.""" + import requests + url = f"{self._base_url}{path}" + resp = requests.request(method, url, headers=self._headers(), timeout=30, **kwargs) + resp.raise_for_status() + return resp.json() + + def initialize(self, session_id: str, **kwargs) -> None: + self._api_key = os.environ.get("RETAINDB_API_KEY", "") + self._base_url = os.environ.get("RETAINDB_BASE_URL", _DEFAULT_BASE_URL) + self._project = os.environ.get("RETAINDB_PROJECT", "hermes") + self._user_id = kwargs.get("user_id", "default") + self._session_id = session_id + + def system_prompt_block(self) -> str: + return ( + "# RetainDB Memory\n" + f"Active. Project: {self._project}.\n" + "Use retaindb_search to find memories, retaindb_remember to store facts, " + "retaindb_profile for a user overview, retaindb_context for task-relevant context." + ) + + def prefetch(self, query: str) -> str: + if self._prefetch_thread and self._prefetch_thread.is_alive(): + self._prefetch_thread.join(timeout=3.0) + with self._prefetch_lock: + result = self._prefetch_result + self._prefetch_result = "" + if not result: + return "" + return f"## RetainDB Memory\n{result}" + + def queue_prefetch(self, query: str) -> None: + def _run(): + try: + data = self._api("POST", "/v1/recall", json={ + "project": self._project, + "query": query, + "user_id": self._user_id, + "top_k": 5, + }) + results = data.get("results", []) + if results: + lines = [r.get("content", "") for r in results if r.get("content")] + with self._prefetch_lock: + self._prefetch_result = "\n".join(f"- {l}" for l in lines) + except Exception as e: + logger.debug("RetainDB prefetch failed: %s", e) + + self._prefetch_thread = threading.Thread(target=_run, daemon=True, name="retaindb-prefetch") + self._prefetch_thread.start() + + def sync_turn(self, user_content: str, assistant_content: str) -> None: + try: + self._api("POST", "/v1/ingest", json={ + "project": self._project, + "user_id": self._user_id, + "session_id": self._session_id, + "messages": [ + {"role": "user", "content": user_content}, + {"role": "assistant", "content": assistant_content}, + ], + }) + except Exception as e: + logger.warning("RetainDB sync failed: %s", e) + + def get_tool_schemas(self) -> List[Dict[str, Any]]: + return [PROFILE_SCHEMA, SEARCH_SCHEMA, CONTEXT_SCHEMA, REMEMBER_SCHEMA, FORGET_SCHEMA] + + def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str: + try: + if tool_name == "retaindb_profile": + data = self._api("GET", f"/v1/profile/{self._project}/{self._user_id}") + return json.dumps(data) + + elif tool_name == "retaindb_search": + query = args.get("query", "") + if not query: + return json.dumps({"error": "query is required"}) + data = self._api("POST", "/v1/search", json={ + "project": self._project, + "user_id": self._user_id, + "query": query, + "top_k": min(int(args.get("top_k", 8)), 20), + }) + return json.dumps(data) + + elif tool_name == "retaindb_context": + query = args.get("query", "") + if not query: + return json.dumps({"error": "query is required"}) + data = self._api("POST", "/v1/recall", json={ + "project": self._project, + "user_id": self._user_id, + "query": query, + "top_k": 5, + }) + return json.dumps(data) + + elif tool_name == "retaindb_remember": + content = args.get("content", "") + if not content: + return json.dumps({"error": "content is required"}) + data = self._api("POST", "/v1/remember", json={ + "project": self._project, + "user_id": self._user_id, + "content": content, + "memory_type": args.get("memory_type", "fact"), + "importance": float(args.get("importance", 0.5)), + }) + return json.dumps(data) + + elif tool_name == "retaindb_forget": + memory_id = args.get("memory_id", "") + if not memory_id: + return json.dumps({"error": "memory_id is required"}) + data = self._api("DELETE", f"/v1/memory/{memory_id}") + return json.dumps(data) + + return json.dumps({"error": f"Unknown tool: {tool_name}"}) + except Exception as e: + return json.dumps({"error": str(e)}) + + def on_memory_write(self, action: str, target: str, content: str) -> None: + if action == "add": + try: + self._api("POST", "/v1/remember", json={ + "project": self._project, + "user_id": self._user_id, + "content": content, + "memory_type": "preference" if target == "user" else "fact", + }) + except Exception as e: + logger.debug("RetainDB memory bridge failed: %s", e) + + def shutdown(self) -> None: + if self._prefetch_thread and self._prefetch_thread.is_alive(): + self._prefetch_thread.join(timeout=5.0) + + +def register(ctx) -> None: + """Register RetainDB as a memory provider plugin.""" + ctx.register_memory_provider(RetainDBMemoryProvider()) diff --git a/plugins/retaindb-memory/plugin.yaml b/plugins/retaindb-memory/plugin.yaml new file mode 100644 index 00000000000..d176f5b0df5 --- /dev/null +++ b/plugins/retaindb-memory/plugin.yaml @@ -0,0 +1,7 @@ +name: retaindb-memory +version: 1.0.0 +description: > + Cross-session memory via RetainDB — durable write-behind queue, semantic + search with deduplication, user identity resolution, and profile retrieval. +requires_env: + - RETAINDB_API_KEY