diff --git a/plugins/hermes-memory-store/__init__.py b/plugins/hermes-memory-store/__init__.py new file mode 100644 index 00000000000..86435f7a9c0 --- /dev/null +++ b/plugins/hermes-memory-store/__init__.py @@ -0,0 +1,365 @@ +"""hermes-memory-store — holographic memory plugin using MemoryProvider interface. + +Registers as a MemoryProvider plugin, giving the agent structured fact storage +with entity resolution, trust scoring, and HRR-based compositional retrieval. + +Original plugin by dusterbloom (PR #2351), adapted to the MemoryProvider ABC. + +Config in ~/.hermes/config.yaml: + plugins: + hermes-memory-store: + db_path: ~/.hermes/memory_store.db + auto_extract: false + default_trust: 0.5 + min_trust_threshold: 0.3 + temporal_decay_half_life: 0 +""" + +from __future__ import annotations + +import json +import logging +import re +from pathlib import Path +from typing import Any, Dict, List + +from agent.memory_provider import MemoryProvider +from .store import MemoryStore +from .retrieval import FactRetriever + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Tool schemas (unchanged from original PR) +# --------------------------------------------------------------------------- + +FACT_STORE_SCHEMA = { + "name": "fact_store", + "description": ( + "Deep structured memory with algebraic reasoning. " + "Use alongside the memory tool — memory for always-on context, " + "fact_store for deep recall and compositional queries.\n\n" + "ACTIONS (simple → powerful):\n" + "• add — Store a fact the user would expect you to remember.\n" + "• search — Keyword lookup ('editor config', 'deploy process').\n" + "• probe — Entity recall: ALL facts about a person/thing.\n" + "• related — What connects to an entity? Structural adjacency.\n" + "• reason — Compositional: facts connected to MULTIPLE entities simultaneously.\n" + "• contradict — Memory hygiene: find facts making conflicting claims.\n" + "• update/remove/list — CRUD operations.\n\n" + "IMPORTANT: Before answering questions about the user, ALWAYS probe or reason first." + ), + "parameters": { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"], + }, + "content": {"type": "string", "description": "Fact content (required for 'add')."}, + "query": {"type": "string", "description": "Search query (required for 'search')."}, + "entity": {"type": "string", "description": "Entity name for 'probe'/'related'."}, + "entities": {"type": "array", "items": {"type": "string"}, "description": "Entity names for 'reason'."}, + "fact_id": {"type": "integer", "description": "Fact ID for 'update'/'remove'."}, + "category": {"type": "string", "enum": ["user_pref", "project", "tool", "general"]}, + "tags": {"type": "string", "description": "Comma-separated tags."}, + "trust_delta": {"type": "number", "description": "Trust adjustment for 'update'."}, + "min_trust": {"type": "number", "description": "Minimum trust filter (default: 0.3)."}, + "limit": {"type": "integer", "description": "Max results (default: 10)."}, + }, + "required": ["action"], + }, +} + +FACT_FEEDBACK_SCHEMA = { + "name": "fact_feedback", + "description": ( + "Rate a fact after using it. Mark 'helpful' if accurate, 'unhelpful' if outdated. " + "This trains the memory — good facts rise, bad facts sink." + ), + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["helpful", "unhelpful"]}, + "fact_id": {"type": "integer", "description": "The fact ID to rate."}, + }, + "required": ["action", "fact_id"], + }, +} + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + +def _load_plugin_config() -> dict: + config_path = Path("~/.hermes/config.yaml").expanduser() + if not config_path.exists(): + return {} + try: + import yaml + with open(config_path) as f: + all_config = yaml.safe_load(f) or {} + return all_config.get("plugins", {}).get("hermes-memory-store", {}) or {} + except Exception: + return {} + + +# --------------------------------------------------------------------------- +# MemoryProvider implementation +# --------------------------------------------------------------------------- + +class HolographicMemoryProvider(MemoryProvider): + """Holographic memory with structured facts, entity resolution, and HRR retrieval.""" + + def __init__(self, config: dict | None = None): + self._config = config or _load_plugin_config() + self._store = None + self._retriever = None + self._min_trust = float(self._config.get("min_trust_threshold", 0.3)) + + @property + def name(self) -> str: + return "holographic" + + def is_available(self) -> bool: + return True # SQLite is always available, numpy is optional + + def initialize(self, session_id: str, **kwargs) -> None: + db_path = self._config.get("db_path", "~/.hermes/memory_store.db") + default_trust = float(self._config.get("default_trust", 0.5)) + hrr_dim = int(self._config.get("hrr_dim", 1024)) + hrr_weight = float(self._config.get("hrr_weight", 0.3)) + temporal_decay = int(self._config.get("temporal_decay_half_life", 0)) + + self._store = MemoryStore(db_path=db_path, default_trust=default_trust, hrr_dim=hrr_dim) + self._retriever = FactRetriever( + store=self._store, + temporal_decay_half_life=temporal_decay, + hrr_weight=hrr_weight, + hrr_dim=hrr_dim, + ) + self._session_id = session_id + + def system_prompt_block(self) -> str: + if not self._store: + return "" + try: + total = self._store._conn.execute( + "SELECT COUNT(*) FROM facts" + ).fetchone()[0] + except Exception: + total = 0 + if total == 0: + return "" + return ( + f"# Holographic Memory\n" + f"Active. {total} facts stored with entity resolution and trust scoring.\n" + f"Use fact_store to search, probe entities, reason across entities, or add facts.\n" + f"Use fact_feedback to rate facts after using them (trains trust scores)." + ) + + def prefetch(self, query: str) -> str: + if not self._retriever or not query: + return "" + try: + results = self._retriever.search(query, min_trust=self._min_trust, limit=5) + if not results: + return "" + lines = [] + for r in results: + trust = r.get("trust", 0) + lines.append(f"- [{trust:.1f}] {r.get('content', '')}") + return "## Holographic Memory\n" + "\n".join(lines) + except Exception as e: + logger.debug("Holographic prefetch failed: %s", e) + return "" + + def sync_turn(self, user_content: str, assistant_content: str) -> None: + # Holographic memory stores explicit facts via tools, not auto-sync. + # The on_session_end hook handles auto-extraction if configured. + pass + + def get_tool_schemas(self) -> List[Dict[str, Any]]: + return [FACT_STORE_SCHEMA, FACT_FEEDBACK_SCHEMA] + + def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str: + if tool_name == "fact_store": + return self._handle_fact_store(args) + elif tool_name == "fact_feedback": + return self._handle_fact_feedback(args) + return json.dumps({"error": f"Unknown tool: {tool_name}"}) + + def on_session_end(self, messages: List[Dict[str, Any]]) -> None: + if not self._config.get("auto_extract", False): + return + if not self._store or not messages: + return + self._auto_extract_facts(messages) + + def on_memory_write(self, action: str, target: str, content: str) -> None: + """Mirror built-in memory writes as facts.""" + if action == "add" and self._store and content: + try: + category = "user_pref" if target == "user" else "general" + self._store.add_fact(content, category=category) + except Exception as e: + logger.debug("Holographic memory_write mirror failed: %s", e) + + def shutdown(self) -> None: + self._store = None + self._retriever = None + + # -- Tool handlers ------------------------------------------------------- + + def _handle_fact_store(self, args: dict) -> str: + try: + action = args["action"] + store = self._store + retriever = self._retriever + + if action == "add": + fact_id = store.add_fact( + args["content"], + category=args.get("category", "general"), + tags=args.get("tags", ""), + ) + return json.dumps({"fact_id": fact_id, "status": "added"}) + + elif action == "search": + results = retriever.search( + args["query"], + category=args.get("category"), + min_trust=float(args.get("min_trust", self._min_trust)), + limit=int(args.get("limit", 10)), + ) + return json.dumps({"results": results, "count": len(results)}) + + elif action == "probe": + results = retriever.probe( + args["entity"], + category=args.get("category"), + limit=int(args.get("limit", 10)), + ) + return json.dumps({"results": results, "count": len(results)}) + + elif action == "related": + results = retriever.related( + args["entity"], + category=args.get("category"), + limit=int(args.get("limit", 10)), + ) + return json.dumps({"results": results, "count": len(results)}) + + elif action == "reason": + entities = args.get("entities", []) + if not entities: + return json.dumps({"error": "reason requires 'entities' list"}) + results = retriever.reason( + entities, + category=args.get("category"), + limit=int(args.get("limit", 10)), + ) + return json.dumps({"results": results, "count": len(results)}) + + elif action == "contradict": + results = retriever.contradict( + category=args.get("category"), + limit=int(args.get("limit", 10)), + ) + return json.dumps({"results": results, "count": len(results)}) + + elif action == "update": + updated = store.update_fact( + int(args["fact_id"]), + content=args.get("content"), + trust_delta=float(args["trust_delta"]) if "trust_delta" in args else None, + tags=args.get("tags"), + category=args.get("category"), + ) + return json.dumps({"updated": updated}) + + elif action == "remove": + removed = store.remove_fact(int(args["fact_id"])) + return json.dumps({"removed": removed}) + + elif action == "list": + facts = store.list_facts( + category=args.get("category"), + min_trust=float(args.get("min_trust", 0.0)), + limit=int(args.get("limit", 10)), + ) + return json.dumps({"facts": facts, "count": len(facts)}) + + else: + return json.dumps({"error": f"Unknown action: {action}"}) + + except KeyError as exc: + return json.dumps({"error": f"Missing required argument: {exc}"}) + except Exception as exc: + return json.dumps({"error": str(exc)}) + + def _handle_fact_feedback(self, args: dict) -> str: + try: + fact_id = int(args["fact_id"]) + helpful = args["action"] == "helpful" + result = self._store.record_feedback(fact_id, helpful=helpful) + return json.dumps(result) + except KeyError as exc: + return json.dumps({"error": f"Missing required argument: {exc}"}) + except Exception as exc: + return json.dumps({"error": str(exc)}) + + # -- Auto-extraction (on_session_end) ------------------------------------ + + def _auto_extract_facts(self, messages: list) -> None: + _PREF_PATTERNS = [ + re.compile(r'\bI\s+(?:prefer|like|love|use|want|need)\s+(.+)', re.IGNORECASE), + re.compile(r'\bmy\s+(?:favorite|preferred|default)\s+\w+\s+is\s+(.+)', re.IGNORECASE), + re.compile(r'\bI\s+(?:always|never|usually)\s+(.+)', re.IGNORECASE), + ] + _DECISION_PATTERNS = [ + re.compile(r'\bwe\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+)', re.IGNORECASE), + re.compile(r'\bthe\s+project\s+(?:uses|needs|requires)\s+(.+)', re.IGNORECASE), + ] + + extracted = 0 + for msg in messages: + if msg.get("role") != "user": + continue + content = msg.get("content", "") + if not isinstance(content, str) or len(content) < 10: + continue + + for pattern in _PREF_PATTERNS: + if pattern.search(content): + try: + self._store.add_fact(content[:400], category="user_pref") + extracted += 1 + except Exception: + pass + break + + for pattern in _DECISION_PATTERNS: + if pattern.search(content): + try: + self._store.add_fact(content[:400], category="project") + extracted += 1 + except Exception: + pass + break + + if extracted: + logger.info("Auto-extracted %d facts from conversation", extracted) + + +# --------------------------------------------------------------------------- +# Plugin entry point +# --------------------------------------------------------------------------- + +def register(ctx) -> None: + """Register the holographic memory provider with the plugin system.""" + config = _load_plugin_config() + provider = HolographicMemoryProvider(config=config) + ctx.register_memory_provider(provider) diff --git a/plugins/hermes-memory-store/holographic.py b/plugins/hermes-memory-store/holographic.py new file mode 100644 index 00000000000..e1401fde108 --- /dev/null +++ b/plugins/hermes-memory-store/holographic.py @@ -0,0 +1,203 @@ +"""Holographic Reduced Representations (HRR) with phase encoding. + +HRRs are a vector symbolic architecture for encoding compositional structure +into fixed-width distributed representations. This module uses *phase vectors*: +each concept is a vector of angles in [0, 2π). The algebraic operations are: + + bind — circular convolution (phase addition) — associates two concepts + unbind — circular correlation (phase subtraction) — retrieves a bound value + bundle — superposition (circular mean) — merges multiple concepts + +Phase encoding is numerically stable, avoids the magnitude collapse of +traditional complex-number HRRs, and maps cleanly to cosine similarity. + +Atoms are generated deterministically from SHA-256 so representations are +identical across processes, machines, and language versions. + +References: + Plate (1995) — Holographic Reduced Representations + Gayler (2004) — Vector Symbolic Architectures answer Jackendoff's challenges +""" + +import hashlib +import logging +import struct +import math + +try: + import numpy as np + _HAS_NUMPY = True +except ImportError: + _HAS_NUMPY = False + +logger = logging.getLogger(__name__) + +_TWO_PI = 2.0 * math.pi + + +def _require_numpy() -> None: + if not _HAS_NUMPY: + raise RuntimeError("numpy is required for holographic operations") + + +def encode_atom(word: str, dim: int = 1024) -> "np.ndarray": + """Deterministic phase vector via SHA-256 counter blocks. + + Uses hashlib (not numpy RNG) for cross-platform reproducibility. + + Algorithm: + - Generate enough SHA-256 blocks by hashing f"{word}:{i}" for i=0,1,2,... + - Concatenate digests, interpret as uint16 values via struct.unpack + - Scale to [0, 2π): phases = values * (2π / 65536) + - Truncate to dim elements + - Returns np.float64 array of shape (dim,) + """ + _require_numpy() + + # Each SHA-256 digest is 32 bytes = 16 uint16 values. + values_per_block = 16 + blocks_needed = math.ceil(dim / values_per_block) + + uint16_values: list[int] = [] + for i in range(blocks_needed): + digest = hashlib.sha256(f"{word}:{i}".encode()).digest() + uint16_values.extend(struct.unpack("<16H", digest)) + + phases = np.array(uint16_values[:dim], dtype=np.float64) * (_TWO_PI / 65536.0) + return phases + + +def bind(a: "np.ndarray", b: "np.ndarray") -> "np.ndarray": + """Circular convolution = element-wise phase addition. + + Binding associates two concepts into a single composite vector. + The result is dissimilar to both inputs (quasi-orthogonal). + """ + _require_numpy() + return (a + b) % _TWO_PI + + +def unbind(memory: "np.ndarray", key: "np.ndarray") -> "np.ndarray": + """Circular correlation = element-wise phase subtraction. + + Unbinding retrieves the value associated with a key from a memory vector. + unbind(bind(a, b), a) ≈ b (up to superposition noise) + """ + _require_numpy() + return (memory - key) % _TWO_PI + + +def bundle(*vectors: "np.ndarray") -> "np.ndarray": + """Superposition via circular mean of complex exponentials. + + Bundling merges multiple vectors into one that is similar to each input. + The result can hold O(sqrt(dim)) items before similarity degrades. + """ + _require_numpy() + complex_sum = np.sum([np.exp(1j * v) for v in vectors], axis=0) + return np.angle(complex_sum) % _TWO_PI + + +def similarity(a: "np.ndarray", b: "np.ndarray") -> float: + """Phase cosine similarity. Range [-1, 1]. + + Returns 1.0 for identical vectors, near 0.0 for random (unrelated) vectors, + and -1.0 for perfectly anti-correlated vectors. + """ + _require_numpy() + return float(np.mean(np.cos(a - b))) + + +def encode_text(text: str, dim: int = 1024) -> "np.ndarray": + """Bag-of-words: bundle of atom vectors for each token. + + Tokenizes by lowercasing, splitting on whitespace, and stripping + leading/trailing punctuation from each token. + + Returns bundle of all token atom vectors. + If text is empty or produces no tokens, returns encode_atom("__hrr_empty__", dim). + """ + _require_numpy() + + tokens = [ + token.strip(".,!?;:\"'()[]{}") + for token in text.lower().split() + ] + tokens = [t for t in tokens if t] + + if not tokens: + return encode_atom("__hrr_empty__", dim) + + atom_vectors = [encode_atom(token, dim) for token in tokens] + return bundle(*atom_vectors) + + +def encode_fact(content: str, entities: list[str], dim: int = 1024) -> "np.ndarray": + """Structured encoding: content bound to ROLE_CONTENT, each entity bound to ROLE_ENTITY, all bundled. + + Role vectors are reserved atoms: "__hrr_role_content__", "__hrr_role_entity__" + + Components: + 1. bind(encode_text(content, dim), encode_atom("__hrr_role_content__", dim)) + 2. For each entity: bind(encode_atom(entity.lower(), dim), encode_atom("__hrr_role_entity__", dim)) + 3. bundle all components together + + This enables algebraic extraction: + unbind(fact, bind(entity, ROLE_ENTITY)) ≈ content_vector + """ + _require_numpy() + + role_content = encode_atom("__hrr_role_content__", dim) + role_entity = encode_atom("__hrr_role_entity__", dim) + + components: list[np.ndarray] = [ + bind(encode_text(content, dim), role_content) + ] + + for entity in entities: + components.append(bind(encode_atom(entity.lower(), dim), role_entity)) + + return bundle(*components) + + +def phases_to_bytes(phases: "np.ndarray") -> bytes: + """Serialize phase vector to bytes. float64 tobytes — 8 KB at dim=1024.""" + _require_numpy() + return phases.tobytes() + + +def bytes_to_phases(data: bytes) -> "np.ndarray": + """Deserialize bytes back to phase vector. Inverse of phases_to_bytes. + + The .copy() call is required because frombuffer returns a read-only view + backed by the bytes object; callers expect a mutable array. + """ + _require_numpy() + return np.frombuffer(data, dtype=np.float64).copy() + + +def snr_estimate(dim: int, n_items: int) -> float: + """Signal-to-noise ratio estimate for holographic storage. + + SNR = sqrt(dim / n_items) when n_items > 0, else inf. + + The SNR falls below 2.0 when n_items > dim / 4, meaning retrieval + errors become likely. Logs a warning when this threshold is crossed. + """ + _require_numpy() + + if n_items <= 0: + return float("inf") + + snr = math.sqrt(dim / n_items) + + if snr < 2.0: + logger.warning( + "HRR storage near capacity: SNR=%.2f (dim=%d, n_items=%d). " + "Retrieval accuracy may degrade. Consider increasing dim or reducing stored items.", + snr, + dim, + n_items, + ) + + return snr diff --git a/plugins/hermes-memory-store/plugin.yaml b/plugins/hermes-memory-store/plugin.yaml new file mode 100644 index 00000000000..2919cfb822c --- /dev/null +++ b/plugins/hermes-memory-store/plugin.yaml @@ -0,0 +1,6 @@ +name: hermes-memory-store +version: 0.1.0 +description: Structured memory backend with SQLite storage, trust scoring, entity resolution, and hybrid keyword/BM25 retrieval. +author: peppi +hooks: + - on_session_end diff --git a/plugins/hermes-memory-store/retrieval.py b/plugins/hermes-memory-store/retrieval.py new file mode 100644 index 00000000000..69a43954d36 --- /dev/null +++ b/plugins/hermes-memory-store/retrieval.py @@ -0,0 +1,597 @@ +"""Hybrid keyword/BM25 retrieval for the memory store. + +Ported from KIK memory_agent.py — combines FTS5 full-text search with +Jaccard similarity reranking and trust-weighted scoring. +""" + +from __future__ import annotations + +import math +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .store import MemoryStore + +try: + from . import holographic as hrr +except ImportError: + import holographic as hrr # type: ignore[no-redef] + + +class FactRetriever: + """Multi-strategy fact retrieval with trust-weighted scoring.""" + + def __init__( + self, + store: MemoryStore, + temporal_decay_half_life: int = 0, # days, 0 = disabled + fts_weight: float = 0.4, + jaccard_weight: float = 0.3, + hrr_weight: float = 0.3, + hrr_dim: int = 1024, + ): + self.store = store + self.half_life = temporal_decay_half_life + self.hrr_dim = hrr_dim + + # Auto-redistribute weights if numpy unavailable + if hrr_weight > 0 and not hrr._HAS_NUMPY: + fts_weight = 0.6 + jaccard_weight = 0.4 + hrr_weight = 0.0 + + self.fts_weight = fts_weight + self.jaccard_weight = jaccard_weight + self.hrr_weight = hrr_weight + + def search( + self, + query: str, + category: str | None = None, + min_trust: float = 0.3, + limit: int = 10, + ) -> list[dict]: + """Hybrid search: FTS5 candidates → Jaccard rerank → trust weighting. + + Pipeline: + 1. FTS5 search: Get limit*3 candidates from SQLite full-text search + 2. Jaccard boost: Token overlap between query and fact content + 3. Trust weighting: final_score = relevance * trust_score + 4. Temporal decay (optional): decay = 0.5^(age_days / half_life) + + Returns list of dicts with fact data + 'score' field, sorted by score desc. + """ + # Stage 1: Get FTS5 candidates (more than limit for reranking headroom) + candidates = self._fts_candidates(query, category, min_trust, limit * 3) + + if not candidates: + return [] + + # Stage 2: Rerank with Jaccard + trust + optional decay + query_tokens = self._tokenize(query) + scored = [] + + for fact in candidates: + content_tokens = self._tokenize(fact["content"]) + tag_tokens = self._tokenize(fact.get("tags", "")) + all_tokens = content_tokens | tag_tokens + + jaccard = self._jaccard_similarity(query_tokens, all_tokens) + fts_score = fact.get("fts_rank", 0.0) + + # HRR similarity + if self.hrr_weight > 0 and fact.get("hrr_vector"): + fact_vec = hrr.bytes_to_phases(fact["hrr_vector"]) + query_vec = hrr.encode_text(query, self.hrr_dim) + hrr_sim = (hrr.similarity(query_vec, fact_vec) + 1.0) / 2.0 # shift to [0,1] + else: + hrr_sim = 0.5 # neutral + + # Combine FTS5 + Jaccard + HRR + relevance = (self.fts_weight * fts_score + + self.jaccard_weight * jaccard + + self.hrr_weight * hrr_sim) + + # Trust weighting + score = relevance * fact["trust_score"] + + # Optional temporal decay + if self.half_life > 0: + score *= self._temporal_decay(fact.get("updated_at") or fact.get("created_at")) + + fact["score"] = score + scored.append(fact) + + # Sort by score descending, return top limit + scored.sort(key=lambda x: x["score"], reverse=True) + results = scored[:limit] + # Strip raw HRR bytes — callers expect JSON-serializable dicts + for fact in results: + fact.pop("hrr_vector", None) + return results + + def probe( + self, + entity: str, + category: str | None = None, + limit: int = 10, + ) -> list[dict]: + """Compositional entity query using HRR algebra. + + Unbinds entity from memory bank to extract associated content. + This is NOT keyword search — it uses algebraic structure to find facts + where the entity plays a structural role. + + Falls back to FTS5 search if numpy unavailable. + """ + if not hrr._HAS_NUMPY: + # Fallback to keyword search on entity name + return self.search(entity, category=category, limit=limit) + + conn = self.store._conn + + # Encode entity as role-bound vector + role_entity = hrr.encode_atom("__hrr_role_entity__", self.hrr_dim) + entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim) + probe_key = hrr.bind(entity_vec, role_entity) + + # Try category-specific bank first, then all facts + if category: + bank_name = f"cat:{category}" + bank_row = conn.execute( + "SELECT vector FROM memory_banks WHERE bank_name = ?", + (bank_name,), + ).fetchone() + if bank_row: + bank_vec = hrr.bytes_to_phases(bank_row["vector"]) + extracted = hrr.unbind(bank_vec, probe_key) + # Use extracted signal to score individual facts + return self._score_facts_by_vector( + extracted, category=category, limit=limit + ) + + # Score against individual fact vectors directly + where = "WHERE hrr_vector IS NOT NULL" + params: list = [] + if category: + where += " AND category = ?" + params.append(category) + + rows = conn.execute( + f""" + SELECT fact_id, content, category, tags, trust_score, + retrieval_count, helpful_count, created_at, updated_at, + hrr_vector + FROM facts + {where} + """, + params, + ).fetchall() + + if not rows: + # Final fallback: keyword search + return self.search(entity, category=category, limit=limit) + + scored = [] + for row in rows: + fact = dict(row) + fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector")) + # Unbind probe key from fact to see if entity is structurally present + residual = hrr.unbind(fact_vec, probe_key) + # Compare residual against content signal + role_content = hrr.encode_atom("__hrr_role_content__", self.hrr_dim) + content_vec = hrr.bind(hrr.encode_text(fact["content"], self.hrr_dim), role_content) + sim = hrr.similarity(residual, content_vec) + fact["score"] = (sim + 1.0) / 2.0 * fact["trust_score"] + scored.append(fact) + + scored.sort(key=lambda x: x["score"], reverse=True) + return scored[:limit] + + def related( + self, + entity: str, + category: str | None = None, + limit: int = 10, + ) -> list[dict]: + """Discover facts that share structural connections with an entity. + + Unlike probe (which finds facts *about* an entity), related finds + facts that are connected through shared context — e.g., other entities + mentioned alongside this one, or content that overlaps structurally. + + Falls back to FTS5 search if numpy unavailable. + """ + if not hrr._HAS_NUMPY: + return self.search(entity, category=category, limit=limit) + + conn = self.store._conn + + # Encode entity as a bare atom (not role-bound — we want ANY structural match) + entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim) + + # Get all facts with vectors + where = "WHERE hrr_vector IS NOT NULL" + params: list = [] + if category: + where += " AND category = ?" + params.append(category) + + rows = conn.execute( + f""" + SELECT fact_id, content, category, tags, trust_score, + retrieval_count, helpful_count, created_at, updated_at, + hrr_vector + FROM facts + {where} + """, + params, + ).fetchall() + + if not rows: + return self.search(entity, category=category, limit=limit) + + # Score each fact by how much the entity's atom appears in its vector + # This catches both role-bound entity matches AND content word matches + scored = [] + for row in rows: + fact = dict(row) + fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector")) + + # Check structural similarity: unbind entity from fact + residual = hrr.unbind(fact_vec, entity_vec) + # A high-similarity residual to ANY known role vector means this entity + # plays a structural role in the fact + role_entity = hrr.encode_atom("__hrr_role_entity__", self.hrr_dim) + role_content = hrr.encode_atom("__hrr_role_content__", self.hrr_dim) + + entity_role_sim = hrr.similarity(residual, role_entity) + content_role_sim = hrr.similarity(residual, role_content) + # Take the max — entity could appear in either role + best_sim = max(entity_role_sim, content_role_sim) + + fact["score"] = (best_sim + 1.0) / 2.0 * fact["trust_score"] + scored.append(fact) + + scored.sort(key=lambda x: x["score"], reverse=True) + return scored[:limit] + + def reason( + self, + entities: list[str], + category: str | None = None, + limit: int = 10, + ) -> list[dict]: + """Multi-entity compositional query — vector-space JOIN. + + Given multiple entities, algebraically intersects their structural + connections to find facts related to ALL of them simultaneously. + This is compositional reasoning that no embedding DB can do. + + Example: reason(["peppi", "backend"]) finds facts where peppi AND + backend both play structural roles — without keyword matching. + + Falls back to FTS5 search if numpy unavailable. + """ + if not hrr._HAS_NUMPY or not entities: + # Fallback: search with all entities as keywords + query = " ".join(entities) + return self.search(query, category=category, limit=limit) + + conn = self.store._conn + role_entity = hrr.encode_atom("__hrr_role_entity__", self.hrr_dim) + + # For each entity, compute what the bank "remembers" about it + # by unbinding entity+role from each fact vector + entity_residuals = [] + for entity in entities: + entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim) + probe_key = hrr.bind(entity_vec, role_entity) + entity_residuals.append(probe_key) + + # The intersection key: bundle all probe keys, then use it to find + # facts that are structurally connected to ALL entities + intersection_key = hrr.bundle(*entity_residuals) if len(entity_residuals) > 1 else entity_residuals[0] + + # Get all facts with vectors + where = "WHERE hrr_vector IS NOT NULL" + params: list = [] + if category: + where += " AND category = ?" + params.append(category) + + rows = conn.execute( + f""" + SELECT fact_id, content, category, tags, trust_score, + retrieval_count, helpful_count, created_at, updated_at, + hrr_vector + FROM facts + {where} + """, + params, + ).fetchall() + + if not rows: + query = " ".join(entities) + return self.search(query, category=category, limit=limit) + + # Score each fact: unbind the intersection key and check if the + # residual is coherent (high self-similarity = structured match) + scored = [] + for row in rows: + fact = dict(row) + fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector")) + + # Unbind intersection key from fact + residual = hrr.unbind(fact_vec, intersection_key) + + # Score by how much EACH entity is present in this fact + # A fact scores high only if ALL entities have structural presence + entity_scores = [] + for entity in entities: + entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim) + probe_key = hrr.bind(entity_vec, role_entity) + single_residual = hrr.unbind(fact_vec, probe_key) + # Check residual against content role (does this entity participate?) + role_content = hrr.encode_atom("__hrr_role_content__", self.hrr_dim) + sim = hrr.similarity(single_residual, role_content) + entity_scores.append(sim) + + # Use minimum score — fact must match ALL entities, not just some + # This is the AND semantics (vs OR which would use mean/max) + min_sim = min(entity_scores) + fact["score"] = (min_sim + 1.0) / 2.0 * fact["trust_score"] + scored.append(fact) + + scored.sort(key=lambda x: x["score"], reverse=True) + return scored[:limit] + + def contradict( + self, + category: str | None = None, + threshold: float = 0.3, + limit: int = 10, + ) -> list[dict]: + """Find potentially contradictory facts via entity overlap + content divergence. + + Two facts contradict when they share entities (same subject) but have + low content-vector similarity (different claims). This is automated + memory hygiene — no other memory system does this. + + Returns pairs of facts with a contradiction score. + Falls back to empty list if numpy unavailable. + """ + if not hrr._HAS_NUMPY: + return [] + + conn = self.store._conn + + # Get all facts with vectors and their linked entities + where = "WHERE f.hrr_vector IS NOT NULL" + params: list = [] + if category: + where += " AND f.category = ?" + params.append(category) + + rows = conn.execute( + f""" + SELECT f.fact_id, f.content, f.category, f.tags, f.trust_score, + f.created_at, f.updated_at, f.hrr_vector + FROM facts f + {where} + """, + params, + ).fetchall() + + if len(rows) < 2: + return [] + + # Build entity sets per fact + fact_entities: dict[int, set[str]] = {} + for row in rows: + fid = row["fact_id"] + entity_rows = conn.execute( + """ + SELECT e.name FROM entities e + JOIN fact_entities fe ON fe.entity_id = e.entity_id + WHERE fe.fact_id = ? + """, + (fid,), + ).fetchall() + fact_entities[fid] = {r["name"].lower() for r in entity_rows} + + # Compare all pairs: high entity overlap + low content similarity = contradiction + facts = [dict(r) for r in rows] + contradictions = [] + + for i in range(len(facts)): + for j in range(i + 1, len(facts)): + f1, f2 = facts[i], facts[j] + ents1 = fact_entities.get(f1["fact_id"], set()) + ents2 = fact_entities.get(f2["fact_id"], set()) + + if not ents1 or not ents2: + continue + + # Entity overlap (Jaccard) + entity_overlap = len(ents1 & ents2) / len(ents1 | ents2) if (ents1 | ents2) else 0.0 + + if entity_overlap < 0.3: + continue # Not enough entity overlap to be contradictory + + # Content similarity via HRR vectors + v1 = hrr.bytes_to_phases(f1["hrr_vector"]) + v2 = hrr.bytes_to_phases(f2["hrr_vector"]) + content_sim = hrr.similarity(v1, v2) + + # High entity overlap + low content similarity = potential contradiction + # contradiction_score: higher = more contradictory + contradiction_score = entity_overlap * (1.0 - (content_sim + 1.0) / 2.0) + + if contradiction_score >= threshold: + # Strip hrr_vector from output (not JSON serializable) + f1_clean = {k: v for k, v in f1.items() if k != "hrr_vector"} + f2_clean = {k: v for k, v in f2.items() if k != "hrr_vector"} + contradictions.append({ + "fact_a": f1_clean, + "fact_b": f2_clean, + "entity_overlap": round(entity_overlap, 3), + "content_similarity": round(content_sim, 3), + "contradiction_score": round(contradiction_score, 3), + "shared_entities": sorted(ents1 & ents2), + }) + + contradictions.sort(key=lambda x: x["contradiction_score"], reverse=True) + return contradictions[:limit] + + def _score_facts_by_vector( + self, + target_vec: "np.ndarray", + category: str | None = None, + limit: int = 10, + ) -> list[dict]: + """Score facts by similarity to a target vector.""" + conn = self.store._conn + + where = "WHERE hrr_vector IS NOT NULL" + params: list = [] + if category: + where += " AND category = ?" + params.append(category) + + rows = conn.execute( + f""" + SELECT fact_id, content, category, tags, trust_score, + retrieval_count, helpful_count, created_at, updated_at, + hrr_vector + FROM facts + {where} + """, + params, + ).fetchall() + + scored = [] + for row in rows: + fact = dict(row) + fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector")) + sim = hrr.similarity(target_vec, fact_vec) + fact["score"] = (sim + 1.0) / 2.0 * fact["trust_score"] + scored.append(fact) + + scored.sort(key=lambda x: x["score"], reverse=True) + return scored[:limit] + + def _fts_candidates( + self, + query: str, + category: str | None, + min_trust: float, + limit: int, + ) -> list[dict]: + """Get raw FTS5 candidates from the store. + + Uses the store's database connection directly for FTS5 MATCH + with rank scoring. Normalizes FTS5 rank to [0, 1] range. + """ + conn = self.store._conn + + # Build query - FTS5 rank is negative (lower = better match) + # We need to join facts_fts with facts to get all columns + params: list = [] + where_clauses = ["facts_fts MATCH ?"] + params.append(query) + + if category: + where_clauses.append("f.category = ?") + params.append(category) + + where_clauses.append("f.trust_score >= ?") + params.append(min_trust) + + where_sql = " AND ".join(where_clauses) + + sql = f""" + SELECT f.*, facts_fts.rank as fts_rank_raw + FROM facts_fts + JOIN facts f ON f.fact_id = facts_fts.rowid + WHERE {where_sql} + ORDER BY facts_fts.rank + LIMIT ? + """ + params.append(limit) + + try: + rows = conn.execute(sql, params).fetchall() + except Exception: + # FTS5 MATCH can fail on malformed queries — fall back to empty + return [] + + if not rows: + return [] + + # Normalize FTS5 rank: rank is negative, lower = better + # Convert to positive score in [0, 1] range + raw_ranks = [abs(row["fts_rank_raw"]) for row in rows] + max_rank = max(raw_ranks) if raw_ranks else 1.0 + max_rank = max(max_rank, 1e-6) # avoid div by zero + + results = [] + for row, raw_rank in zip(rows, raw_ranks): + fact = dict(row) + fact.pop("fts_rank_raw", None) + fact["fts_rank"] = raw_rank / max_rank # normalize to [0, 1] + results.append(fact) + + return results + + @staticmethod + def _tokenize(text: str) -> set[str]: + """Simple whitespace tokenization with lowercasing. + + Strips common punctuation. No stemming/lemmatization (Phase 1). + """ + if not text: + return set() + # Split on whitespace, lowercase, strip punctuation + tokens = set() + for word in text.lower().split(): + cleaned = word.strip(".,;:!?\"'()[]{}#@<>") + if cleaned: + tokens.add(cleaned) + return tokens + + @staticmethod + def _jaccard_similarity(set_a: set, set_b: set) -> float: + """Jaccard similarity coefficient: |A ∩ B| / |A ∪ B|.""" + if not set_a or not set_b: + return 0.0 + intersection = len(set_a & set_b) + union = len(set_a | set_b) + return intersection / union if union > 0 else 0.0 + + def _temporal_decay(self, timestamp_str: str | None) -> float: + """Exponential decay: 0.5^(age_days / half_life_days). + + Returns 1.0 if decay is disabled or timestamp is missing. + """ + if not self.half_life or not timestamp_str: + return 1.0 + + try: + if isinstance(timestamp_str, str): + # Parse ISO format timestamp from SQLite + ts = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + else: + ts = timestamp_str + + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + + age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400 + if age_days < 0: + return 1.0 + + return math.pow(0.5, age_days / self.half_life) + except (ValueError, TypeError): + return 1.0 diff --git a/plugins/hermes-memory-store/store.py b/plugins/hermes-memory-store/store.py new file mode 100644 index 00000000000..e1724f10fb0 --- /dev/null +++ b/plugins/hermes-memory-store/store.py @@ -0,0 +1,572 @@ +""" +SQLite-backed fact store with entity resolution and trust scoring. +Single-user Hermes memory store plugin. +""" + +import re +import sqlite3 +import threading +from datetime import datetime +from pathlib import Path + +try: + from . import holographic as hrr +except ImportError: + import holographic as hrr # type: ignore[no-redef] + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS facts ( + fact_id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT NOT NULL UNIQUE, + category TEXT DEFAULT 'general', + tags TEXT DEFAULT '', + trust_score REAL DEFAULT 0.5, + retrieval_count INTEGER DEFAULT 0, + helpful_count INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + hrr_vector BLOB +); + +CREATE TABLE IF NOT EXISTS entities ( + entity_id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + entity_type TEXT DEFAULT 'unknown', + aliases TEXT DEFAULT '', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS fact_entities ( + fact_id INTEGER REFERENCES facts(fact_id), + entity_id INTEGER REFERENCES entities(entity_id), + PRIMARY KEY (fact_id, entity_id) +); + +CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC); +CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category); +CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name); + +CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts + USING fts5(content, tags, content=facts, content_rowid=fact_id); + +CREATE TRIGGER IF NOT EXISTS facts_ai AFTER INSERT ON facts BEGIN + INSERT INTO facts_fts(rowid, content, tags) + VALUES (new.fact_id, new.content, new.tags); +END; + +CREATE TRIGGER IF NOT EXISTS facts_ad AFTER DELETE ON facts BEGIN + INSERT INTO facts_fts(facts_fts, rowid, content, tags) + VALUES ('delete', old.fact_id, old.content, old.tags); +END; + +CREATE TRIGGER IF NOT EXISTS facts_au AFTER UPDATE ON facts BEGIN + INSERT INTO facts_fts(facts_fts, rowid, content, tags) + VALUES ('delete', old.fact_id, old.content, old.tags); + INSERT INTO facts_fts(rowid, content, tags) + VALUES (new.fact_id, new.content, new.tags); +END; + +CREATE TABLE IF NOT EXISTS memory_banks ( + bank_id INTEGER PRIMARY KEY AUTOINCREMENT, + bank_name TEXT NOT NULL UNIQUE, + vector BLOB NOT NULL, + dim INTEGER NOT NULL, + fact_count INTEGER DEFAULT 0, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); +""" + +# Trust adjustment constants +_HELPFUL_DELTA = 0.05 +_UNHELPFUL_DELTA = -0.10 +_TRUST_MIN = 0.0 +_TRUST_MAX = 1.0 + +# Entity extraction patterns +_RE_CAPITALIZED = re.compile(r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\b') +_RE_DOUBLE_QUOTE = re.compile(r'"([^"]+)"') +_RE_SINGLE_QUOTE = re.compile(r"'([^']+)'") +_RE_AKA = re.compile( + r'(\w+(?:\s+\w+)*)\s+(?:aka|also known as)\s+(\w+(?:\s+\w+)*)', + re.IGNORECASE, +) + + +def _clamp_trust(value: float) -> float: + return max(_TRUST_MIN, min(_TRUST_MAX, value)) + + +class MemoryStore: + """SQLite-backed fact store with entity resolution and trust scoring.""" + + def __init__( + self, + db_path: "str | Path" = "~/.hermes/memory_store.db", + default_trust: float = 0.5, + hrr_dim: int = 1024, + ) -> None: + self.db_path = Path(db_path).expanduser() + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.default_trust = _clamp_trust(default_trust) + self.hrr_dim = hrr_dim + self._hrr_available = hrr._HAS_NUMPY + self._conn: sqlite3.Connection = sqlite3.connect( + str(self.db_path), + check_same_thread=False, + timeout=10.0, + ) + self._lock = threading.RLock() + self._conn.row_factory = sqlite3.Row + self._init_db() + + # ------------------------------------------------------------------ + # Initialisation + # ------------------------------------------------------------------ + + def _init_db(self) -> None: + """Create tables, indexes, and triggers if they do not exist. Enable WAL mode.""" + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.executescript(_SCHEMA) + # Migrate: add hrr_vector column if missing (safe for existing databases) + columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()} + if "hrr_vector" not in columns: + self._conn.execute("ALTER TABLE facts ADD COLUMN hrr_vector BLOB") + self._conn.commit() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def add_fact( + self, + content: str, + category: str = "general", + tags: str = "", + ) -> int: + """Insert a fact and return its fact_id. + + Deduplicates by content (UNIQUE constraint). On duplicate, returns + the existing fact_id without modifying the row. Extracts entities from + the content and links them to the fact. + """ + with self._lock: + content = content.strip() + if not content: + raise ValueError("content must not be empty") + + try: + cur = self._conn.execute( + """ + INSERT INTO facts (content, category, tags, trust_score) + VALUES (?, ?, ?, ?) + """, + (content, category, tags, self.default_trust), + ) + self._conn.commit() + fact_id: int = cur.lastrowid # type: ignore[assignment] + except sqlite3.IntegrityError: + # Duplicate content — return existing id + row = self._conn.execute( + "SELECT fact_id FROM facts WHERE content = ?", (content,) + ).fetchone() + return int(row["fact_id"]) + + # Entity extraction and linking + for name in self._extract_entities(content): + entity_id = self._resolve_entity(name) + self._link_fact_entity(fact_id, entity_id) + + # Compute HRR vector after entity linking + self._compute_hrr_vector(fact_id, content) + self._rebuild_bank(category) + + return fact_id + + def search_facts( + self, + query: str, + category: str | None = None, + min_trust: float = 0.3, + limit: int = 10, + ) -> list[dict]: + """Full-text search over facts using FTS5. + + Returns a list of fact dicts ordered by FTS5 rank, then trust_score + descending. Also increments retrieval_count for matched facts. + """ + with self._lock: + query = query.strip() + if not query: + return [] + + params: list = [query, min_trust] + category_clause = "" + if category is not None: + category_clause = "AND f.category = ?" + params.append(category) + params.append(limit) + + sql = f""" + SELECT f.fact_id, f.content, f.category, f.tags, + f.trust_score, f.retrieval_count, f.helpful_count, + f.created_at, f.updated_at + FROM facts f + JOIN facts_fts fts ON fts.rowid = f.fact_id + WHERE facts_fts MATCH ? + AND f.trust_score >= ? + {category_clause} + ORDER BY fts.rank, f.trust_score DESC + LIMIT ? + """ + + rows = self._conn.execute(sql, params).fetchall() + results = [self._row_to_dict(r) for r in rows] + + if results: + ids = [r["fact_id"] for r in results] + placeholders = ",".join("?" * len(ids)) + self._conn.execute( + f"UPDATE facts SET retrieval_count = retrieval_count + 1 WHERE fact_id IN ({placeholders})", + ids, + ) + self._conn.commit() + + return results + + def update_fact( + self, + fact_id: int, + content: str | None = None, + trust_delta: float | None = None, + tags: str | None = None, + category: str | None = None, + ) -> bool: + """Partially update a fact. Trust is clamped to [0, 1]. + + Returns True if the row existed, False otherwise. + """ + with self._lock: + row = self._conn.execute( + "SELECT fact_id, trust_score FROM facts WHERE fact_id = ?", (fact_id,) + ).fetchone() + if row is None: + return False + + assignments: list[str] = ["updated_at = CURRENT_TIMESTAMP"] + params: list = [] + + if content is not None: + assignments.append("content = ?") + params.append(content.strip()) + if tags is not None: + assignments.append("tags = ?") + params.append(tags) + if category is not None: + assignments.append("category = ?") + params.append(category) + if trust_delta is not None: + new_trust = _clamp_trust(row["trust_score"] + trust_delta) + assignments.append("trust_score = ?") + params.append(new_trust) + + params.append(fact_id) + self._conn.execute( + f"UPDATE facts SET {', '.join(assignments)} WHERE fact_id = ?", + params, + ) + self._conn.commit() + + # If content changed, re-extract entities + if content is not None: + self._conn.execute( + "DELETE FROM fact_entities WHERE fact_id = ?", (fact_id,) + ) + for name in self._extract_entities(content): + entity_id = self._resolve_entity(name) + self._link_fact_entity(fact_id, entity_id) + self._conn.commit() + + # Recompute HRR vector if content changed + if content is not None: + self._compute_hrr_vector(fact_id, content) + # Rebuild bank for relevant category + cat = category or self._conn.execute( + "SELECT category FROM facts WHERE fact_id = ?", (fact_id,) + ).fetchone()["category"] + self._rebuild_bank(cat) + + return True + + def remove_fact(self, fact_id: int) -> bool: + """Delete a fact and its entity links. Returns True if the row existed.""" + with self._lock: + row = self._conn.execute( + "SELECT fact_id, category FROM facts WHERE fact_id = ?", (fact_id,) + ).fetchone() + if row is None: + return False + + self._conn.execute( + "DELETE FROM fact_entities WHERE fact_id = ?", (fact_id,) + ) + self._conn.execute("DELETE FROM facts WHERE fact_id = ?", (fact_id,)) + self._conn.commit() + self._rebuild_bank(row["category"]) + return True + + def list_facts( + self, + category: str | None = None, + min_trust: float = 0.0, + limit: int = 50, + ) -> list[dict]: + """Browse facts ordered by trust_score descending. + + Optionally filter by category and minimum trust score. + """ + with self._lock: + params: list = [min_trust] + category_clause = "" + if category is not None: + category_clause = "AND category = ?" + params.append(category) + params.append(limit) + + sql = f""" + SELECT fact_id, content, category, tags, trust_score, + retrieval_count, helpful_count, created_at, updated_at + FROM facts + WHERE trust_score >= ? + {category_clause} + ORDER BY trust_score DESC + LIMIT ? + """ + rows = self._conn.execute(sql, params).fetchall() + return [self._row_to_dict(r) for r in rows] + + def record_feedback(self, fact_id: int, helpful: bool) -> dict: + """Record user feedback and adjust trust asymmetrically. + + helpful=True -> trust += 0.05, helpful_count += 1 + helpful=False -> trust -= 0.10 + + Returns a dict with fact_id, old_trust, new_trust, helpful_count. + Raises KeyError if fact_id does not exist. + """ + with self._lock: + row = self._conn.execute( + "SELECT fact_id, trust_score, helpful_count FROM facts WHERE fact_id = ?", + (fact_id,), + ).fetchone() + if row is None: + raise KeyError(f"fact_id {fact_id} not found") + + old_trust: float = row["trust_score"] + delta = _HELPFUL_DELTA if helpful else _UNHELPFUL_DELTA + new_trust = _clamp_trust(old_trust + delta) + + helpful_increment = 1 if helpful else 0 + self._conn.execute( + """ + UPDATE facts + SET trust_score = ?, + helpful_count = helpful_count + ?, + updated_at = CURRENT_TIMESTAMP + WHERE fact_id = ? + """, + (new_trust, helpful_increment, fact_id), + ) + self._conn.commit() + + return { + "fact_id": fact_id, + "old_trust": old_trust, + "new_trust": new_trust, + "helpful_count": row["helpful_count"] + helpful_increment, + } + + # ------------------------------------------------------------------ + # Entity helpers + # ------------------------------------------------------------------ + + def _extract_entities(self, text: str) -> list[str]: + """Extract entity candidates from text using simple regex rules. + + Rules applied (in order): + 1. Capitalized multi-word phrases e.g. "John Doe" + 2. Double-quoted terms e.g. "Python" + 3. Single-quoted terms e.g. 'pytest' + 4. AKA patterns e.g. "Guido aka BDFL" -> two entities + + Returns a deduplicated list preserving first-seen order. + """ + seen: set[str] = set() + candidates: list[str] = [] + + def _add(name: str) -> None: + stripped = name.strip() + if stripped and stripped.lower() not in seen: + seen.add(stripped.lower()) + candidates.append(stripped) + + for m in _RE_CAPITALIZED.finditer(text): + _add(m.group(1)) + + for m in _RE_DOUBLE_QUOTE.finditer(text): + _add(m.group(1)) + + for m in _RE_SINGLE_QUOTE.finditer(text): + _add(m.group(1)) + + for m in _RE_AKA.finditer(text): + _add(m.group(1)) + _add(m.group(2)) + + return candidates + + def _resolve_entity(self, name: str) -> int: + """Find an existing entity by name or alias (case-insensitive) or create one. + + Returns the entity_id. + """ + # Exact name match + row = self._conn.execute( + "SELECT entity_id FROM entities WHERE name LIKE ?", (name,) + ).fetchone() + if row is not None: + return int(row["entity_id"]) + + # Search aliases — aliases stored as comma-separated; use LIKE with % boundaries + alias_row = self._conn.execute( + """ + SELECT entity_id FROM entities + WHERE ',' || aliases || ',' LIKE '%,' || ? || ',%' + """, + (name,), + ).fetchone() + if alias_row is not None: + return int(alias_row["entity_id"]) + + # Create new entity + cur = self._conn.execute( + "INSERT INTO entities (name) VALUES (?)", (name,) + ) + self._conn.commit() + return int(cur.lastrowid) # type: ignore[return-value] + + def _link_fact_entity(self, fact_id: int, entity_id: int) -> None: + """Insert into fact_entities, silently ignore if the link already exists.""" + self._conn.execute( + """ + INSERT OR IGNORE INTO fact_entities (fact_id, entity_id) + VALUES (?, ?) + """, + (fact_id, entity_id), + ) + self._conn.commit() + + def _compute_hrr_vector(self, fact_id: int, content: str) -> None: + """Compute and store HRR vector for a fact. No-op if numpy unavailable.""" + with self._lock: + if not self._hrr_available: + return + + # Get entities linked to this fact + rows = self._conn.execute( + """ + SELECT e.name FROM entities e + JOIN fact_entities fe ON fe.entity_id = e.entity_id + WHERE fe.fact_id = ? + """, + (fact_id,), + ).fetchall() + entities = [row["name"] for row in rows] + + vector = hrr.encode_fact(content, entities, self.hrr_dim) + self._conn.execute( + "UPDATE facts SET hrr_vector = ? WHERE fact_id = ?", + (hrr.phases_to_bytes(vector), fact_id), + ) + self._conn.commit() + + def _rebuild_bank(self, category: str) -> None: + """Full rebuild of a category's memory bank from all its fact vectors.""" + with self._lock: + if not self._hrr_available: + return + + bank_name = f"cat:{category}" + rows = self._conn.execute( + "SELECT hrr_vector FROM facts WHERE category = ? AND hrr_vector IS NOT NULL", + (category,), + ).fetchall() + + if not rows: + self._conn.execute("DELETE FROM memory_banks WHERE bank_name = ?", (bank_name,)) + self._conn.commit() + return + + vectors = [hrr.bytes_to_phases(row["hrr_vector"]) for row in rows] + bank_vector = hrr.bundle(*vectors) + fact_count = len(vectors) + + # Check SNR + hrr.snr_estimate(self.hrr_dim, fact_count) + + self._conn.execute( + """ + INSERT INTO memory_banks (bank_name, vector, dim, fact_count, updated_at) + VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(bank_name) DO UPDATE SET + vector = excluded.vector, + dim = excluded.dim, + fact_count = excluded.fact_count, + updated_at = excluded.updated_at + """, + (bank_name, hrr.phases_to_bytes(bank_vector), self.hrr_dim, fact_count), + ) + self._conn.commit() + + def rebuild_all_vectors(self, dim: int | None = None) -> int: + """Recompute all HRR vectors + banks from text. For recovery/migration. + + Returns the number of facts processed. + """ + with self._lock: + if not self._hrr_available: + return 0 + + if dim is not None: + self.hrr_dim = dim + + rows = self._conn.execute( + "SELECT fact_id, content, category FROM facts" + ).fetchall() + + categories: set[str] = set() + for row in rows: + self._compute_hrr_vector(row["fact_id"], row["content"]) + categories.add(row["category"]) + + for category in categories: + self._rebuild_bank(category) + + return len(rows) + + # ------------------------------------------------------------------ + # Utilities + # ------------------------------------------------------------------ + + def _row_to_dict(self, row: sqlite3.Row) -> dict: + """Convert a sqlite3.Row to a plain dict.""" + return dict(row) + + def close(self) -> None: + """Close the database connection.""" + self._conn.close() + + def __enter__(self) -> "MemoryStore": + return self + + def __exit__(self, *_: object) -> None: + self.close() diff --git a/tests/plugins/__init__.py b/tests/plugins/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/plugins/test_holographic.py b/tests/plugins/test_holographic.py new file mode 100644 index 00000000000..2ae259646b4 --- /dev/null +++ b/tests/plugins/test_holographic.py @@ -0,0 +1,248 @@ +"""Tests for holographic.py — pure HRR math operations. + +All tests are synthetic: no filesystem, no database, no external state. +""" + +import sys +from pathlib import Path +from unittest.mock import patch + +import numpy as np +import pytest + +# Plugin path: prefer home dir install, fall back to in-repo copy +_plugin_dir = Path.home() / ".hermes" / "plugins" / "hermes-memory-store" +if not _plugin_dir.exists(): + _plugin_dir = Path(__file__).resolve().parent.parent.parent / "plugins" / "hermes-memory-store" +sys.path.insert(0, str(_plugin_dir)) + +from holographic import ( + _HAS_NUMPY, + bind, + bundle, + bytes_to_phases, + encode_atom, + encode_fact, + encode_text, + phases_to_bytes, + similarity, + snr_estimate, + unbind, +) + + +DIM = 256 # Smaller dim for fast tests; math properties hold at any dim. + + +class TestEncodeAtom: + def test_deterministic(self): + """Same input always produces the identical vector.""" + v1 = encode_atom("hello", DIM) + v2 = encode_atom("hello", DIM) + np.testing.assert_array_equal(v1, v2) + + def test_shape_and_dtype(self): + v = encode_atom("test", DIM) + assert v.shape == (DIM,) + assert v.dtype == np.float64 + + def test_phase_range(self): + """All phases must be in [0, 2π).""" + v = encode_atom("range_check", DIM) + assert np.all(v >= 0.0) + assert np.all(v < 2.0 * np.pi) + + def test_near_orthogonal(self): + """Random unrelated words should have near-zero similarity.""" + words = ["apple", "quantum", "bicycle", "telescope", "jazz"] + vectors = [encode_atom(w, DIM) for w in words] + for i in range(len(vectors)): + for j in range(i + 1, len(vectors)): + sim = similarity(vectors[i], vectors[j]) + assert abs(sim) < 0.15, f"'{words[i]}' vs '{words[j]}': sim={sim:.4f}" + + +class TestBindUnbind: + def test_roundtrip(self): + """unbind(bind(a, b), b) should recover a exactly.""" + a = encode_atom("concept_a", DIM) + b = encode_atom("concept_b", DIM) + bound = bind(a, b) + recovered = unbind(bound, b) + np.testing.assert_allclose(recovered, a, atol=1e-10) + + def test_commutative(self): + """bind(a, b) == bind(b, a) — phase addition is commutative.""" + a = encode_atom("alpha", DIM) + b = encode_atom("beta", DIM) + np.testing.assert_allclose(bind(a, b), bind(b, a), atol=1e-10) + + def test_bound_dissimilar_to_inputs(self): + """The bound vector should be quasi-orthogonal to both inputs.""" + a = encode_atom("dog", DIM) + b = encode_atom("cat", DIM) + bound = bind(a, b) + assert abs(similarity(bound, a)) < 0.15 + assert abs(similarity(bound, b)) < 0.15 + + +class TestBundle: + def test_preserves_similarity(self): + """Bundled vector should be similar to each of its components.""" + vecs = [encode_atom(f"item_{i}", DIM) for i in range(3)] + bundled = bundle(*vecs) + for v in vecs: + sim = similarity(bundled, v) + assert sim > 0.2, f"Bundle lost signal: sim={sim:.4f}" + + def test_capacity_degrades(self): + """Similarity to each component should decrease as more items are added.""" + target = encode_atom("target", DIM) + sims = [] + for n in [2, 5, 10, 20]: + others = [encode_atom(f"noise_{i}", DIM) for i in range(n - 1)] + bundled = bundle(target, *others) + sims.append(similarity(bundled, target)) + # Similarity should generally decrease (allow minor non-monotonicity) + assert sims[0] > sims[-1], f"No degradation: {sims}" + + +class TestSimilarity: + def test_identity(self): + """similarity(a, a) should be exactly 1.0.""" + a = encode_atom("self", DIM) + assert similarity(a, a) == pytest.approx(1.0) + + def test_orthogonal_near_zero(self): + """Random vectors should have similarity near 0.""" + sims = [] + for i in range(10): + a = encode_atom(f"rand_a_{i}", DIM) + b = encode_atom(f"rand_b_{i}", DIM) + sims.append(similarity(a, b)) + mean_sim = np.mean(sims) + assert abs(mean_sim) < 0.1, f"Mean similarity too high: {mean_sim:.4f}" + + +class TestEncodeText: + def test_order_invariant(self): + """Bag-of-words should be order-invariant.""" + v1 = encode_text("the quick brown fox", DIM) + v2 = encode_text("fox brown quick the", DIM) + sim = similarity(v1, v2) + assert sim == pytest.approx(1.0, abs=1e-10) + + def test_similar_texts_high_similarity(self): + """Texts sharing words should have high similarity.""" + v1 = encode_text("the cat sat on the mat", DIM) + v2 = encode_text("the cat on the mat", DIM) + sim = similarity(v1, v2) + assert sim > 0.5, f"Similar texts low sim: {sim:.4f}" + + def test_empty_text(self): + """Empty text should return a valid vector (the __hrr_empty__ atom).""" + v = encode_text("", DIM) + assert v.shape == (DIM,) + + +class TestEncodeFact: + def test_entity_extraction(self): + """Unbinding entity from fact should recover content signal.""" + content = "prefers rust for systems programming" + entities = ["peppi"] + + fact_vec = encode_fact(content, entities, DIM) + content_vec = encode_text(content, DIM) + + # Unbind: fact - bind(entity, ROLE_ENTITY) should be similar to bind(content, ROLE_CONTENT) + role_entity = encode_atom("__hrr_role_entity__", DIM) + role_content = encode_atom("__hrr_role_content__", DIM) + entity_vec = encode_atom("peppi", DIM) + + # Extract what's associated with peppi's entity role + probe = unbind(fact_vec, bind(entity_vec, role_entity)) + + # The extracted signal should have nonzero similarity to the content-role binding + content_bound = bind(content_vec, role_content) + sim = similarity(probe, content_bound) + # At DIM=256, 2-component bundle: SNR≈11, but phase cosine similarity compresses + # the signal. Noise baseline is ~0.035 std; signal should be above 0.03. + assert sim > 0.03, f"Entity extraction failed: sim={sim:.4f}" + + def test_multiple_entities(self): + """Facts with multiple entities should encode all of them.""" + fact_vec = encode_fact("loves pizza", ["alice", "bob"], DIM) + assert fact_vec.shape == (DIM,) + # Both entities should be recoverable (above noise floor) + role_entity = encode_atom("__hrr_role_entity__", DIM) + for name in ["alice", "bob"]: + entity_vec = encode_atom(name, DIM) + probe = unbind(fact_vec, bind(entity_vec, role_entity)) + # Just verify it's a valid vector (deeper tests would check signal) + assert probe.shape == (DIM,) + + +class TestSerialization: + def test_roundtrip(self): + """bytes_to_phases(phases_to_bytes(v)) should recover v exactly.""" + v = encode_atom("serialize_me", DIM) + data = phases_to_bytes(v) + recovered = bytes_to_phases(data) + np.testing.assert_array_equal(v, recovered) + + def test_byte_size(self): + """float64 * dim = 8 * dim bytes.""" + v = encode_atom("size_check", DIM) + data = phases_to_bytes(v) + assert len(data) == DIM * 8 + + +class TestSNREstimate: + def test_formula(self): + """SNR should match sqrt(dim / n_items).""" + import math + assert snr_estimate(1024, 4) == pytest.approx(math.sqrt(1024 / 4)) + assert snr_estimate(1024, 256) == pytest.approx(math.sqrt(1024 / 256)) + + def test_empty(self): + """Zero items → infinite SNR.""" + assert snr_estimate(1024, 0) == float("inf") + + def test_warning_logged(self, caplog): + """SNR < 2.0 should emit a warning.""" + import logging + with caplog.at_level(logging.WARNING): + snr_estimate(4, 4) # SNR = 1.0 + assert "near capacity" in caplog.text.lower() + + +class TestNumpyGuard: + def test_raises_without_numpy(self): + """All public functions should raise RuntimeError when numpy is absent.""" + import holographic + + original = holographic._HAS_NUMPY + try: + holographic._HAS_NUMPY = False + with pytest.raises(RuntimeError, match="numpy is required"): + encode_atom("test", DIM) + with pytest.raises(RuntimeError, match="numpy is required"): + bind(np.zeros(DIM), np.zeros(DIM)) + with pytest.raises(RuntimeError, match="numpy is required"): + unbind(np.zeros(DIM), np.zeros(DIM)) + with pytest.raises(RuntimeError, match="numpy is required"): + bundle(np.zeros(DIM)) + with pytest.raises(RuntimeError, match="numpy is required"): + similarity(np.zeros(DIM), np.zeros(DIM)) + with pytest.raises(RuntimeError, match="numpy is required"): + encode_text("test", DIM) + with pytest.raises(RuntimeError, match="numpy is required"): + encode_fact("test", ["e"], DIM) + with pytest.raises(RuntimeError, match="numpy is required"): + phases_to_bytes(np.zeros(DIM)) + with pytest.raises(RuntimeError, match="numpy is required"): + bytes_to_phases(b"\x00" * DIM * 8) + with pytest.raises(RuntimeError, match="numpy is required"): + snr_estimate(DIM, 1) + finally: + holographic._HAS_NUMPY = original diff --git a/tests/plugins/test_holographic_provider.py b/tests/plugins/test_holographic_provider.py new file mode 100644 index 00000000000..50e76ed60d5 --- /dev/null +++ b/tests/plugins/test_holographic_provider.py @@ -0,0 +1,336 @@ +"""Tests for the holographic memory MemoryProvider adapter. + +Tests the HolographicMemoryProvider interface — registration, tool handling, +prefetch, session end hooks, and memory bridging. +""" + +import json +import sys +import pytest +from pathlib import Path +from unittest.mock import MagicMock + +# Add plugin dir to path so imports work +_plugin_dir = Path(__file__).resolve().parent.parent.parent / "plugins" / "hermes-memory-store" +sys.path.insert(0, str(_plugin_dir)) + +from agent.memory_manager import MemoryManager +from agent.builtin_memory_provider import BuiltinMemoryProvider + + +def _make_provider(tmp_path, config=None): + """Create a HolographicMemoryProvider with a temp DB.""" + # Import inside function to avoid module-level issues + sys.path.insert(0, str(_plugin_dir)) + from plugins import HolographicMemoryProvider # noqa: F811 + # Use the full import path + from importlib import import_module + init_mod = import_module("plugins.hermes-memory-store") + + cfg = config or {} + cfg.setdefault("db_path", str(tmp_path / "test.db")) + provider = init_mod.HolographicMemoryProvider(config=cfg) + provider.initialize(session_id="test-session") + return provider + + +@pytest.fixture +def provider(tmp_path): + """Create an initialized holographic provider.""" + sys.path.insert(0, str(_plugin_dir.parent)) + # Direct import + spec_path = _plugin_dir / "__init__.py" + import importlib.util + spec = importlib.util.spec_from_file_location( + "hermes_memory_store_test", + spec_path, + submodule_search_locations=[str(_plugin_dir)], + ) + mod = importlib.util.module_from_spec(spec) + sys.modules["hermes_memory_store_test"] = mod + # Pre-populate submodule references + store_spec = importlib.util.spec_from_file_location( + "hermes_memory_store_test.store", + _plugin_dir / "store.py", + ) + store_mod = importlib.util.module_from_spec(store_spec) + sys.modules["hermes_memory_store_test.store"] = store_mod + store_spec.loader.exec_module(store_mod) + + retrieval_spec = importlib.util.spec_from_file_location( + "hermes_memory_store_test.retrieval", + _plugin_dir / "retrieval.py", + ) + retrieval_mod = importlib.util.module_from_spec(retrieval_spec) + sys.modules["hermes_memory_store_test.retrieval"] = retrieval_mod + retrieval_spec.loader.exec_module(retrieval_mod) + + spec.loader.exec_module(mod) + + cfg = {"db_path": str(tmp_path / "test.db")} + p = mod.HolographicMemoryProvider(config=cfg) + p.initialize(session_id="test-session") + yield p + p.shutdown() + + # Cleanup + for key in list(sys.modules): + if key.startswith("hermes_memory_store_test"): + del sys.modules[key] + + +class TestProviderRegistration: + def test_register_calls_register_memory_provider(self, tmp_path): + """register(ctx) should call ctx.register_memory_provider().""" + import importlib.util + + spec = importlib.util.spec_from_file_location( + "hermes_memory_store_reg", + _plugin_dir / "__init__.py", + submodule_search_locations=[str(_plugin_dir)], + ) + mod = importlib.util.module_from_spec(spec) + sys.modules["hermes_memory_store_reg"] = mod + + store_spec = importlib.util.spec_from_file_location( + "hermes_memory_store_reg.store", _plugin_dir / "store.py") + store_mod = importlib.util.module_from_spec(store_spec) + sys.modules["hermes_memory_store_reg.store"] = store_mod + store_spec.loader.exec_module(store_mod) + + retrieval_spec = importlib.util.spec_from_file_location( + "hermes_memory_store_reg.retrieval", _plugin_dir / "retrieval.py") + retrieval_mod = importlib.util.module_from_spec(retrieval_spec) + sys.modules["hermes_memory_store_reg.retrieval"] = retrieval_mod + retrieval_spec.loader.exec_module(retrieval_mod) + + spec.loader.exec_module(mod) + + ctx = MagicMock() + mod.register(ctx) + ctx.register_memory_provider.assert_called_once() + registered = ctx.register_memory_provider.call_args[0][0] + assert registered.name == "holographic" + + for key in list(sys.modules): + if key.startswith("hermes_memory_store_reg"): + del sys.modules[key] + + +class TestToolHandling: + def test_add_and_search(self, provider): + """Add a fact via tool call, then search for it.""" + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "add", "content": "User prefers vim over emacs"} + )) + assert "fact_id" in result + fact_id = result["fact_id"] + + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "search", "query": "vim emacs"} + )) + assert result["count"] >= 1 + contents = [r["content"] for r in result["results"]] + assert any("vim" in c for c in contents) + + def test_add_and_probe(self, provider): + """Add facts about an entity, then probe it.""" + provider.handle_tool_call( + "fact_store", {"action": "add", "content": "Peppi uses Rust for systems work"} + ) + provider.handle_tool_call( + "fact_store", {"action": "add", "content": "Peppi prefers Neovim"} + ) + + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "probe", "entity": "peppi"} + )) + assert result["count"] >= 1 + + def test_related(self, provider): + """Test related entity lookup.""" + provider.handle_tool_call( + "fact_store", {"action": "add", "content": "Peppi uses Rust for systems work"} + ) + provider.handle_tool_call( + "fact_store", {"action": "add", "content": "Rust ensures memory safety"} + ) + + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "related", "entity": "rust"} + )) + assert "results" in result + assert "count" in result + + def test_reason(self, provider): + """Test compositional reasoning across entities.""" + provider.handle_tool_call( + "fact_store", {"action": "add", "content": "Peppi uses Rust for backend work"} + ) + provider.handle_tool_call( + "fact_store", {"action": "add", "content": "The backend handles API requests"} + ) + + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "reason", "entities": ["peppi", "backend"]} + )) + assert "results" in result + + def test_feedback(self, provider): + """Test trust scoring via feedback.""" + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "add", "content": "Test feedback fact"} + )) + fact_id = result["fact_id"] + + result = json.loads(provider.handle_tool_call( + "fact_feedback", {"action": "helpful", "fact_id": fact_id} + )) + assert "error" not in result + + def test_update_and_remove(self, provider): + """Test CRUD operations.""" + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "add", "content": "Will be updated"} + )) + fact_id = result["fact_id"] + + # Update + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "update", "fact_id": fact_id, "content": "Updated content"} + )) + assert result["updated"] + + # Remove + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "remove", "fact_id": fact_id} + )) + assert result["removed"] + + def test_all_handlers_return_json(self, provider): + """Every tool call must return valid JSON.""" + # Add a fact first + r = provider.handle_tool_call("fact_store", {"action": "add", "content": "JSON test"}) + parsed = json.loads(r) + fact_id = parsed["fact_id"] + + # Test every action + actions = [ + ("fact_store", {"action": "search", "query": "JSON"}), + ("fact_store", {"action": "list"}), + ("fact_store", {"action": "probe", "entity": "test"}), + ("fact_store", {"action": "related", "entity": "test"}), + ("fact_store", {"action": "reason", "entities": ["test"]}), + ("fact_store", {"action": "contradict"}), + ("fact_feedback", {"action": "helpful", "fact_id": fact_id}), + ] + for tool_name, args in actions: + result = provider.handle_tool_call(tool_name, args) + json.loads(result) # Should not raise + + +class TestPrefetch: + def test_prefetch_returns_matching_facts(self, provider): + """Prefetch should return facts matching the query.""" + provider.handle_tool_call( + "fact_store", {"action": "add", "content": "The deploy pipeline uses Docker"} + ) + result = provider.prefetch("deploy pipeline") + assert "Docker" in result or "deploy" in result + + def test_prefetch_empty_when_no_facts(self, provider): + assert provider.prefetch("anything") == "" + + +class TestSystemPromptBlock: + def test_empty_when_no_facts(self, provider): + assert provider.system_prompt_block() == "" + + def test_shows_count_with_facts(self, provider): + provider.handle_tool_call( + "fact_store", {"action": "add", "content": "Fact one"} + ) + provider.handle_tool_call( + "fact_store", {"action": "add", "content": "Fact two"} + ) + block = provider.system_prompt_block() + assert "2 facts" in block + assert "Holographic" in block + + +class TestSessionEndHook: + def test_extracts_preferences(self, provider): + """on_session_end should extract preference patterns.""" + provider._config["auto_extract"] = True + messages = [ + {"role": "user", "content": "I prefer dark mode for all my editors"}, + {"role": "assistant", "content": "Noted, I'll remember that."}, + ] + provider.on_session_end(messages) + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "list"} + )) + assert result["count"] >= 1 + + def test_skips_when_disabled(self, provider): + """on_session_end should do nothing when auto_extract is False.""" + provider._config["auto_extract"] = False + messages = [ + {"role": "user", "content": "I prefer dark mode"}, + ] + provider.on_session_end(messages) + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "list"} + )) + assert result["count"] == 0 + + def test_skips_assistant_messages(self, provider): + """Only user messages should be scanned.""" + provider._config["auto_extract"] = True + messages = [ + {"role": "assistant", "content": "I prefer to help you with that"}, + ] + provider.on_session_end(messages) + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "list"} + )) + assert result["count"] == 0 + + +class TestMemoryBridge: + def test_mirrors_builtin_writes(self, provider): + """on_memory_write should store facts from the builtin memory tool.""" + provider.on_memory_write("add", "user", "Timezone: US Pacific") + result = json.loads(provider.handle_tool_call( + "fact_store", {"action": "search", "query": "timezone pacific"} + )) + assert result["count"] >= 1 + + +class TestManagerIntegration: + def test_coexists_with_builtin(self, provider): + """Holographic provider works alongside builtin in MemoryManager.""" + mgr = MemoryManager() + mgr.add_provider(BuiltinMemoryProvider()) + mgr.add_provider(provider) + + assert mgr.provider_names == ["builtin", "holographic"] + + # Tools from holographic are available + schemas = mgr.get_all_tool_schemas() + names = {s["name"] for s in schemas} + assert "fact_store" in names + assert "fact_feedback" in names + + # Tool routing works + result = json.loads(mgr.handle_tool_call( + "fact_store", {"action": "add", "content": "Manager integration test"} + )) + assert result["status"] == "added" + + # Memory bridge fires + mgr.on_memory_write("add", "memory", "Test fact from builtin") + result = json.loads(mgr.handle_tool_call( + "fact_store", {"action": "search", "query": "test fact builtin"} + )) + assert result["count"] >= 1