mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-10 12:18:44 +08:00
Compare commits
8 Commits
feat/dashb
...
refactor/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a30950cd70 | ||
|
|
5ea4cec6cc | ||
|
|
f6de97fd8a | ||
|
|
13ffc5d391 | ||
|
|
b80b03d8b6 | ||
|
|
82daac5f11 | ||
|
|
cfecc6a6aa | ||
|
|
68181bf357 |
@@ -64,7 +64,6 @@ def mirror_to_session(
|
||||
"mirror_source": source_label,
|
||||
}
|
||||
|
||||
_append_to_jsonl(session_id, mirror_msg)
|
||||
_append_to_sqlite(session_id, mirror_msg)
|
||||
|
||||
logger.debug("Mirror: wrote to session %s (from %s)", session_id, source_label)
|
||||
@@ -150,15 +149,6 @@ def _find_session_id(
|
||||
return best_entry.get("session_id")
|
||||
|
||||
|
||||
def _append_to_jsonl(session_id: str, message: dict) -> None:
|
||||
"""Append a message to the JSONL transcript file."""
|
||||
transcript_path = _SESSIONS_DIR / f"{session_id}.jsonl"
|
||||
try:
|
||||
with open(transcript_path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(message, ensure_ascii=False) + "\n")
|
||||
except Exception as e:
|
||||
logger.debug("Mirror JSONL write failed: %s", e)
|
||||
|
||||
|
||||
def _append_to_sqlite(session_id: str, message: dict) -> None:
|
||||
"""Append a message to the SQLite session database."""
|
||||
|
||||
@@ -1410,32 +1410,24 @@ class RecallGuardMiddleware(InboundMiddleware):
|
||||
logger.warning("[%s] Recall: failed to resolve session: %s", adapter.name, exc)
|
||||
return
|
||||
|
||||
# Read JSONL directly — SQLite doesn't preserve message_id field.
|
||||
transcript: list = []
|
||||
# Load transcript from canonical store (state.db). See Branch A below
|
||||
# for why we can no longer match by platform `message_id`.
|
||||
try:
|
||||
path = store.get_transcript_path(sid)
|
||||
if path.exists():
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
transcript.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
transcript = store.load_transcript(sid)
|
||||
except Exception as exc:
|
||||
logger.warning("[%s] Recall: failed to load transcript: %s", adapter.name, exc)
|
||||
return
|
||||
|
||||
# Branch A: redact — try message_id first, then content fallback.
|
||||
# Observed messages have message_id; agent-processed @bot messages
|
||||
# only have content (run.py doesn't write message_id to transcript).
|
||||
# Branch A: content-match redaction. state.db does NOT preserve the
|
||||
# platform `message_id` (only its own autoincrement primary key), so we
|
||||
# cannot redact by exact id. Match by content instead. Most yuanbao
|
||||
# recalls carry the recalled text via `recalled_content`, which is
|
||||
# sufficient for any non-duplicate message.
|
||||
#
|
||||
# TODO: add a `platform_message_id` column to state.db messages to
|
||||
# restore exact-id matching. Tracked separately.
|
||||
target = None
|
||||
for entry in transcript:
|
||||
if entry.get("message_id") == recalled_id:
|
||||
target = entry
|
||||
break
|
||||
if target is None and recalled_content:
|
||||
if recalled_content:
|
||||
for entry in transcript:
|
||||
if entry.get("role") == "user" and entry.get("content") == recalled_content:
|
||||
target = entry
|
||||
@@ -1444,7 +1436,7 @@ class RecallGuardMiddleware(InboundMiddleware):
|
||||
target["content"] = cls._REDACTED
|
||||
try:
|
||||
store.rewrite_transcript(sid, transcript)
|
||||
logger.info("[%s] Recall: redacted msg_id=%s (branch A)", adapter.name, recalled_id)
|
||||
logger.info("[%s] Recall: redacted msg_id=%s (branch A: content match)", adapter.name, recalled_id)
|
||||
except Exception as exc:
|
||||
logger.warning("[%s] Recall: rewrite_transcript failed: %s", adapter.name, exc)
|
||||
return
|
||||
|
||||
@@ -1248,20 +1248,15 @@ class SessionStore:
|
||||
|
||||
return entries
|
||||
|
||||
def get_transcript_path(self, session_id: str) -> Path:
|
||||
"""Get the path to a session's legacy transcript file."""
|
||||
return self.sessions_dir / f"{session_id}.jsonl"
|
||||
|
||||
def append_to_transcript(self, session_id: str, message: Dict[str, Any], skip_db: bool = False) -> None:
|
||||
"""Append a message to a session's transcript (SQLite + legacy JSONL).
|
||||
"""Append a message to a session's transcript (SQLite).
|
||||
|
||||
Args:
|
||||
skip_db: When True, only write to JSONL and skip the SQLite write.
|
||||
Used when the agent already persisted messages to SQLite
|
||||
via its own _flush_messages_to_session_db(), preventing
|
||||
the duplicate-write bug (#860).
|
||||
skip_db: When True, skip the SQLite write. Used when the agent
|
||||
already persisted messages to SQLite via its own
|
||||
_flush_messages_to_session_db(), preventing the
|
||||
duplicate-write bug (#860).
|
||||
"""
|
||||
# Write to SQLite (unless the agent already handled it)
|
||||
if self._db and not skip_db:
|
||||
try:
|
||||
self._db.append_message(
|
||||
@@ -1279,91 +1274,33 @@ class SessionStore:
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Session DB operation failed: %s", e)
|
||||
|
||||
# Also write legacy JSONL (keeps existing tooling working during transition)
|
||||
transcript_path = self.get_transcript_path(session_id)
|
||||
try:
|
||||
with self._lock:
|
||||
with open(transcript_path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(message, ensure_ascii=False) + "\n")
|
||||
except OSError as e:
|
||||
# Disk full / read-only fs / permission errors must not crash the
|
||||
# message handler — the SQLite write above is the primary store.
|
||||
logger.debug("Failed to write JSONL transcript for %s: %s", session_id, e)
|
||||
|
||||
def rewrite_transcript(self, session_id: str, messages: List[Dict[str, Any]]) -> None:
|
||||
"""Replace the entire transcript for a session with new messages.
|
||||
|
||||
Used by /retry, /undo, and /compress to persist modified conversation history.
|
||||
Rewrites both SQLite and legacy JSONL storage.
|
||||
|
||||
Used by /retry, /undo, and /compress to persist modified conversation
|
||||
history. state.db is the canonical store.
|
||||
"""
|
||||
# SQLite: replace atomically so a mid-rewrite failure doesn't leave
|
||||
# the session half-empty in the DB while JSONL still has history.
|
||||
if self._db:
|
||||
try:
|
||||
self._db.replace_messages(session_id, messages)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to rewrite transcript in DB: %s", e)
|
||||
|
||||
# JSONL: overwrite the file
|
||||
transcript_path = self.get_transcript_path(session_id)
|
||||
with open(transcript_path, "w", encoding="utf-8") as f:
|
||||
for msg in messages:
|
||||
f.write(json.dumps(msg, ensure_ascii=False) + "\n")
|
||||
|
||||
def load_transcript(self, session_id: str) -> List[Dict[str, Any]]:
|
||||
"""Load all messages from a session's transcript."""
|
||||
db_messages = []
|
||||
# Try SQLite first
|
||||
if self._db:
|
||||
try:
|
||||
db_messages = self._db.get_messages_as_conversation(session_id)
|
||||
except Exception as e:
|
||||
logger.debug("Could not load messages from DB: %s", e)
|
||||
"""Load all messages from a session's transcript.
|
||||
|
||||
# Load legacy JSONL transcript (may contain more history than SQLite
|
||||
# for sessions created before the DB layer was introduced).
|
||||
transcript_path = self.get_transcript_path(session_id)
|
||||
jsonl_messages = []
|
||||
if transcript_path.exists():
|
||||
try:
|
||||
with open(transcript_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
jsonl_messages.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(
|
||||
"Skipping corrupt line in transcript %s: %s",
|
||||
session_id, line[:120],
|
||||
)
|
||||
except OSError as e:
|
||||
# JSONL is the legacy compatibility store. If it becomes
|
||||
# unreadable, keep gateway recovery working by falling back to
|
||||
# SQLite rows loaded above (or [] when no DB exists).
|
||||
logger.debug("Failed to read JSONL transcript for %s: %s", session_id, e)
|
||||
|
||||
# Prefer whichever source has more messages.
|
||||
#
|
||||
# Background: when a session pre-dates SQLite storage (or when the DB
|
||||
# layer was added while a long-lived session was already active), the
|
||||
# first post-migration turn writes only the *new* messages to SQLite
|
||||
# (because _flush_messages_to_session_db skips messages already in
|
||||
# conversation_history, assuming they're persisted). On the *next*
|
||||
# turn load_transcript returns those few SQLite rows and ignores the
|
||||
# full JSONL history — the model sees a context of 1-4 messages instead
|
||||
# of hundreds. Using the longer source prevents this silent truncation.
|
||||
if len(jsonl_messages) > len(db_messages):
|
||||
if db_messages:
|
||||
logger.debug(
|
||||
"Session %s: JSONL has %d messages vs SQLite %d — "
|
||||
"using JSONL (legacy session not yet fully migrated)",
|
||||
session_id, len(jsonl_messages), len(db_messages),
|
||||
)
|
||||
return jsonl_messages
|
||||
|
||||
return db_messages
|
||||
state.db is the canonical store. The legacy JSONL fallback was removed
|
||||
in spec 002 — pre-DB sessions on existing disks have already been
|
||||
migrated (their DB row holds the full message history).
|
||||
"""
|
||||
if not self._db:
|
||||
return []
|
||||
try:
|
||||
return self._db.get_messages_as_conversation(session_id)
|
||||
except Exception as e:
|
||||
logger.debug("Could not load messages from DB: %s", e)
|
||||
return []
|
||||
|
||||
|
||||
def build_session_context(
|
||||
|
||||
0
tests/gateway/platforms/__init__.py
Normal file
0
tests/gateway/platforms/__init__.py
Normal file
31
tests/gateway/platforms/test_yuanbao_recall_db_only.py
Normal file
31
tests/gateway/platforms/test_yuanbao_recall_db_only.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""Yuanbao recall: branch A (content-match) works against DB-only transcripts."""
|
||||
from gateway.session import SessionStore
|
||||
from gateway.config import GatewayConfig
|
||||
|
||||
|
||||
def test_recall_content_match_finds_target_in_db_transcript(tmp_path, monkeypatch):
|
||||
"""state.db doesn't preserve message_id, so recall uses content-match.
|
||||
|
||||
Pin DEFAULT_DB_PATH to tmp_path so SessionDB() can't write to the real
|
||||
~/.hermes/state.db. (Module-level constant snapshot, see test_load_transcript_db_only.)
|
||||
"""
|
||||
import hermes_state
|
||||
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", tmp_path / "state.db")
|
||||
|
||||
config = GatewayConfig()
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
|
||||
sid = "test-yuanbao-recall"
|
||||
store._db.create_session(session_id=sid, source="yuanbao:group:G")
|
||||
store.append_to_transcript(sid, {"role": "user", "content": "sensitive content", "timestamp": 1.0})
|
||||
store.append_to_transcript(sid, {"role": "assistant", "content": "ack", "timestamp": 2.0})
|
||||
|
||||
# DB-only history carries no platform message_id (PR #29211 dropped that path).
|
||||
history = store.load_transcript(sid)
|
||||
assert all("message_id" not in msg for msg in history)
|
||||
|
||||
# Branch A: content match finds the target row that recall would redact.
|
||||
target = next((m for m in history
|
||||
if m.get("role") == "user" and m.get("content") == "sensitive content"), None)
|
||||
assert target is not None
|
||||
# Caller would then redact: target["content"] = REDACTED; store.rewrite_transcript(sid, history)
|
||||
32
tests/gateway/test_load_transcript_db_only.py
Normal file
32
tests/gateway/test_load_transcript_db_only.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""Verify load_transcript returns SQLite messages without any JSONL file."""
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.session import SessionStore
|
||||
from gateway.config import GatewayConfig
|
||||
|
||||
|
||||
def test_load_transcript_returns_db_messages_when_no_jsonl(tmp_path, monkeypatch):
|
||||
"""Reading a transcript must work from SQLite alone — no JSONL fallback needed.
|
||||
|
||||
Pin DEFAULT_DB_PATH to tmp_path so this test cannot write to the real
|
||||
~/.hermes/state.db. (DEFAULT_DB_PATH is a module-level constant computed
|
||||
at hermes_state import time, before pytest's HERMES_HOME monkeypatch
|
||||
fires — the autouse fixture's HERMES_HOME override doesn't help here.)
|
||||
"""
|
||||
import hermes_state
|
||||
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", tmp_path / "state.db")
|
||||
|
||||
config = GatewayConfig()
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
|
||||
sid = "test-session-db-only"
|
||||
store._db.create_session(session_id=sid, source="test")
|
||||
store.append_to_transcript(sid, {"role": "user", "content": "hello", "timestamp": 1.0})
|
||||
store.append_to_transcript(sid, {"role": "assistant", "content": "world", "timestamp": 2.0})
|
||||
|
||||
history = store.load_transcript(sid)
|
||||
assert len(history) == 2
|
||||
assert history[0]["content"] == "hello"
|
||||
assert history[1]["content"] == "world"
|
||||
@@ -8,7 +8,6 @@ import gateway.mirror as mirror_mod
|
||||
from gateway.mirror import (
|
||||
mirror_to_session,
|
||||
_find_session_id,
|
||||
_append_to_jsonl,
|
||||
)
|
||||
|
||||
|
||||
@@ -152,33 +151,6 @@ class TestFindSessionId:
|
||||
assert result == "sess_1"
|
||||
|
||||
|
||||
class TestAppendToJsonl:
|
||||
def test_appends_message(self, tmp_path):
|
||||
sessions_dir = tmp_path / "sessions"
|
||||
sessions_dir.mkdir()
|
||||
|
||||
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir):
|
||||
_append_to_jsonl("sess_1", {"role": "assistant", "content": "Hello"})
|
||||
|
||||
transcript = sessions_dir / "sess_1.jsonl"
|
||||
lines = transcript.read_text().strip().splitlines()
|
||||
assert len(lines) == 1
|
||||
msg = json.loads(lines[0])
|
||||
assert msg["role"] == "assistant"
|
||||
assert msg["content"] == "Hello"
|
||||
|
||||
def test_appends_multiple_messages(self, tmp_path):
|
||||
sessions_dir = tmp_path / "sessions"
|
||||
sessions_dir.mkdir()
|
||||
|
||||
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir):
|
||||
_append_to_jsonl("sess_1", {"role": "assistant", "content": "msg1"})
|
||||
_append_to_jsonl("sess_1", {"role": "assistant", "content": "msg2"})
|
||||
|
||||
transcript = sessions_dir / "sess_1.jsonl"
|
||||
lines = transcript.read_text().strip().splitlines()
|
||||
assert len(lines) == 2
|
||||
|
||||
|
||||
class TestMirrorToSession:
|
||||
def test_successful_mirror(self, tmp_path):
|
||||
@@ -192,15 +164,16 @@ class TestMirrorToSession:
|
||||
|
||||
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
||||
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file), \
|
||||
patch("gateway.mirror._append_to_sqlite"):
|
||||
patch("gateway.mirror._append_to_sqlite") as mock_sqlite:
|
||||
result = mirror_to_session("telegram", "12345", "Hello!", source_label="cli")
|
||||
|
||||
assert result is True
|
||||
|
||||
# Check JSONL was written
|
||||
transcript = sessions_dir / "sess_abc.jsonl"
|
||||
assert transcript.exists()
|
||||
msg = json.loads(transcript.read_text().strip())
|
||||
# Check SQLite writer was called with the mirror message
|
||||
mock_sqlite.assert_called_once()
|
||||
call_args = mock_sqlite.call_args
|
||||
assert call_args[0][0] == "sess_abc"
|
||||
msg = call_args[0][1]
|
||||
assert msg["content"] == "Hello!"
|
||||
assert msg["role"] == "assistant"
|
||||
assert msg["mirror"] is True
|
||||
@@ -222,12 +195,12 @@ class TestMirrorToSession:
|
||||
|
||||
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
||||
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file), \
|
||||
patch("gateway.mirror._append_to_sqlite"):
|
||||
patch("gateway.mirror._append_to_sqlite") as mock_sqlite:
|
||||
result = mirror_to_session("telegram", "-1001", "Hello topic!", source_label="cron", thread_id="10")
|
||||
|
||||
assert result is True
|
||||
assert (sessions_dir / "sess_topic_a.jsonl").exists()
|
||||
assert not (sessions_dir / "sess_topic_b.jsonl").exists()
|
||||
mock_sqlite.assert_called_once()
|
||||
assert mock_sqlite.call_args[0][0] == "sess_topic_a"
|
||||
|
||||
def test_successful_mirror_uses_user_id_for_group_session(self, tmp_path):
|
||||
sessions_dir, index_file = _setup_sessions(tmp_path, {
|
||||
@@ -245,7 +218,7 @@ class TestMirrorToSession:
|
||||
|
||||
with patch.object(mirror_mod, "_SESSIONS_DIR", sessions_dir), \
|
||||
patch.object(mirror_mod, "_SESSIONS_INDEX", index_file), \
|
||||
patch("gateway.mirror._append_to_sqlite"):
|
||||
patch("gateway.mirror._append_to_sqlite") as mock_sqlite:
|
||||
result = mirror_to_session(
|
||||
"telegram",
|
||||
"-1001",
|
||||
@@ -255,8 +228,8 @@ class TestMirrorToSession:
|
||||
)
|
||||
|
||||
assert result is True
|
||||
assert (sessions_dir / "sess_alice.jsonl").exists()
|
||||
assert not (sessions_dir / "sess_bob.jsonl").exists()
|
||||
mock_sqlite.assert_called_once()
|
||||
assert mock_sqlite.call_args[0][0] == "sess_alice"
|
||||
|
||||
def test_no_matching_session(self, tmp_path):
|
||||
sessions_dir, index_file = _setup_sessions(tmp_path, {})
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Regression tests for /retry replacement semantics."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -11,14 +11,17 @@ from gateway.session import SessionStore
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_gateway_retry_replaces_last_user_turn_in_transcript(tmp_path):
|
||||
async def test_gateway_retry_replaces_last_user_turn_in_transcript(tmp_path, monkeypatch):
|
||||
# Pin DEFAULT_DB_PATH so SessionDB() doesn't write to the real ~/.hermes/state.db.
|
||||
# (Module-level constant snapshot, see test_load_transcript_db_only.)
|
||||
import hermes_state
|
||||
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", tmp_path / "state.db")
|
||||
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
store._db = None
|
||||
store._loaded = True
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
|
||||
session_id = "retry_session"
|
||||
store._db.create_session(session_id=session_id, source="test")
|
||||
for msg in [
|
||||
{"role": "session_meta", "tools": []},
|
||||
{"role": "user", "content": "first question"},
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
"""Tests for gateway session management."""
|
||||
|
||||
import builtins
|
||||
import json
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
@@ -503,19 +501,19 @@ class TestSenderPrefixWithBackfill:
|
||||
|
||||
|
||||
class TestSessionStoreRewriteTranscript:
|
||||
"""Regression: /retry and /undo must persist truncated history to disk."""
|
||||
"""Regression: /retry and /undo must persist truncated history to DB."""
|
||||
|
||||
@pytest.fixture()
|
||||
def store(self, tmp_path):
|
||||
def store(self, tmp_path, monkeypatch):
|
||||
import hermes_state
|
||||
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", tmp_path / "state.db")
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = None # no SQLite for these tests
|
||||
s._loaded = True
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
return s
|
||||
|
||||
def test_rewrite_replaces_jsonl(self, store, tmp_path):
|
||||
def test_rewrite_replaces_transcript(self, store, tmp_path):
|
||||
session_id = "test_session_1"
|
||||
store._db.create_session(session_id=session_id, source="test")
|
||||
# Write initial transcript
|
||||
for msg in [
|
||||
{"role": "user", "content": "hello"},
|
||||
@@ -538,6 +536,7 @@ class TestSessionStoreRewriteTranscript:
|
||||
|
||||
def test_rewrite_with_empty_list(self, store):
|
||||
session_id = "test_session_2"
|
||||
store._db.create_session(session_id=session_id, source="test")
|
||||
store.append_to_transcript(session_id, {"role": "user", "content": "hi"})
|
||||
|
||||
store.rewrite_transcript(session_id, [])
|
||||
@@ -546,171 +545,28 @@ class TestSessionStoreRewriteTranscript:
|
||||
assert reloaded == []
|
||||
|
||||
|
||||
class TestLoadTranscriptCorruptLines:
|
||||
"""Regression: corrupt JSONL lines (e.g. from mid-write crash) must be
|
||||
skipped instead of crashing the entire transcript load. GH-1193."""
|
||||
class TestLoadTranscriptDBOnly:
|
||||
"""After spec 002, load_transcript reads only from state.db."""
|
||||
|
||||
@pytest.fixture()
|
||||
def store(self, tmp_path):
|
||||
def test_db_only_returns_empty_for_nonexistent(self, tmp_path, monkeypatch):
|
||||
import hermes_state
|
||||
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", tmp_path / "state.db")
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = None
|
||||
s._loaded = True
|
||||
return s
|
||||
|
||||
def test_corrupt_line_skipped(self, store, tmp_path):
|
||||
session_id = "corrupt_test"
|
||||
transcript_path = store.get_transcript_path(session_id)
|
||||
transcript_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(transcript_path, "w") as f:
|
||||
f.write('{"role": "user", "content": "hello"}\n')
|
||||
f.write('{"role": "assistant", "content": "hi th') # truncated
|
||||
f.write("\n")
|
||||
f.write('{"role": "user", "content": "goodbye"}\n')
|
||||
|
||||
messages = store.load_transcript(session_id)
|
||||
assert len(messages) == 2
|
||||
assert messages[0]["content"] == "hello"
|
||||
assert messages[1]["content"] == "goodbye"
|
||||
|
||||
def test_all_lines_corrupt_returns_empty(self, store, tmp_path):
|
||||
session_id = "all_corrupt"
|
||||
transcript_path = store.get_transcript_path(session_id)
|
||||
transcript_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(transcript_path, "w") as f:
|
||||
f.write("not json at all\n")
|
||||
f.write("{truncated\n")
|
||||
|
||||
messages = store.load_transcript(session_id)
|
||||
assert messages == []
|
||||
|
||||
def test_valid_transcript_unaffected(self, store, tmp_path):
|
||||
session_id = "valid_test"
|
||||
store.append_to_transcript(session_id, {"role": "user", "content": "a"})
|
||||
store.append_to_transcript(session_id, {"role": "assistant", "content": "b"})
|
||||
|
||||
messages = store.load_transcript(session_id)
|
||||
assert len(messages) == 2
|
||||
assert messages[0]["content"] == "a"
|
||||
assert messages[1]["content"] == "b"
|
||||
|
||||
|
||||
class TestLoadTranscriptPreferLongerSource:
|
||||
"""Regression: load_transcript must return whichever source (SQLite or JSONL)
|
||||
has more messages to prevent silent truncation. GH-3212."""
|
||||
|
||||
@pytest.fixture()
|
||||
def store_with_db(self, tmp_path):
|
||||
"""SessionStore with both SQLite and JSONL active."""
|
||||
from hermes_state import SessionDB
|
||||
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = SessionDB(db_path=tmp_path / "state.db")
|
||||
s._loaded = True
|
||||
return s
|
||||
|
||||
def test_jsonl_longer_than_sqlite_returns_jsonl(self, store_with_db):
|
||||
"""Legacy session: JSONL has full history, SQLite has only recent turn."""
|
||||
sid = "legacy_session"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
# JSONL has 10 messages (legacy history — written before SQLite existed)
|
||||
for i in range(10):
|
||||
role = "user" if i % 2 == 0 else "assistant"
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": role, "content": f"msg-{i}"}, skip_db=True,
|
||||
)
|
||||
# SQLite has only 2 messages (recent turn after migration)
|
||||
store_with_db._db.append_message(session_id=sid, role="user", content="new-q")
|
||||
store_with_db._db.append_message(session_id=sid, role="assistant", content="new-a")
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 10
|
||||
assert result[0]["content"] == "msg-0"
|
||||
|
||||
def test_sqlite_longer_than_jsonl_returns_sqlite(self, store_with_db):
|
||||
"""Fully migrated session: SQLite has more (JSONL stopped growing)."""
|
||||
sid = "migrated_session"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
# JSONL has 2 old messages
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "user", "content": "old-q"}, skip_db=True,
|
||||
)
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "assistant", "content": "old-a"}, skip_db=True,
|
||||
)
|
||||
# SQLite has 4 messages (superset after migration)
|
||||
for i in range(4):
|
||||
role = "user" if i % 2 == 0 else "assistant"
|
||||
store_with_db._db.append_message(session_id=sid, role=role, content=f"db-{i}")
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 4
|
||||
assert result[0]["content"] == "db-0"
|
||||
|
||||
def test_sqlite_empty_falls_back_to_jsonl(self, store_with_db):
|
||||
"""No SQLite rows — falls back to JSONL (original behavior preserved)."""
|
||||
sid = "no_db_rows"
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "user", "content": "hello"}, skip_db=True,
|
||||
)
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "assistant", "content": "hi"}, skip_db=True,
|
||||
)
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 2
|
||||
assert result[0]["content"] == "hello"
|
||||
|
||||
def test_both_empty_returns_empty(self, store_with_db):
|
||||
"""Neither source has data — returns empty list."""
|
||||
result = store_with_db.load_transcript("nonexistent")
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
result = store.load_transcript("nonexistent")
|
||||
assert result == []
|
||||
|
||||
def test_equal_length_prefers_sqlite(self, store_with_db):
|
||||
"""When both have same count, SQLite wins (has richer fields like reasoning)."""
|
||||
sid = "equal_session"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
# Write 2 messages to JSONL only
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "user", "content": "jsonl-q"}, skip_db=True,
|
||||
)
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "assistant", "content": "jsonl-a"}, skip_db=True,
|
||||
)
|
||||
# Write 2 different messages to SQLite only
|
||||
store_with_db._db.append_message(session_id=sid, role="user", content="db-q")
|
||||
store_with_db._db.append_message(session_id=sid, role="assistant", content="db-a")
|
||||
def test_db_only_returns_messages(self, tmp_path, monkeypatch):
|
||||
import hermes_state
|
||||
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", tmp_path / "state.db")
|
||||
config = GatewayConfig()
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
sid = "db_only_session"
|
||||
store._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
store._db.append_message(session_id=sid, role="user", content="db-q")
|
||||
store._db.append_message(session_id=sid, role="assistant", content="db-a")
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 2
|
||||
# Should be the SQLite version (equal count → prefers SQLite)
|
||||
assert result[0]["content"] == "db-q"
|
||||
|
||||
def test_unreadable_jsonl_returns_sqlite(self, store_with_db, monkeypatch):
|
||||
"""Unreadable legacy JSONL must not hide valid SQLite history."""
|
||||
sid = "unreadable_jsonl"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
store_with_db._db.append_message(session_id=sid, role="user", content="db-q")
|
||||
store_with_db._db.append_message(session_id=sid, role="assistant", content="db-a")
|
||||
|
||||
transcript_path = store_with_db.get_transcript_path(sid)
|
||||
transcript_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
transcript_path.write_text('{"role": "user", "content": "jsonl-q"}\n', encoding="utf-8")
|
||||
|
||||
real_open = builtins.open
|
||||
|
||||
def raise_for_transcript(path, *args, **kwargs):
|
||||
mode = args[0] if args else kwargs.get("mode", "r")
|
||||
if Path(path) == transcript_path and "r" in mode:
|
||||
raise OSError("simulated unreadable transcript")
|
||||
return real_open(path, *args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(builtins, "open", raise_for_transcript)
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
result = store.load_transcript(sid)
|
||||
assert len(result) == 2
|
||||
assert result[0]["content"] == "db-q"
|
||||
assert result[1]["content"] == "db-a"
|
||||
|
||||
@@ -22,13 +22,18 @@ from gateway.session import SessionSource, SessionStore, build_session_key
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def store(tmp_path):
|
||||
"""SessionStore with no SQLite, for fast unit tests."""
|
||||
def store(tmp_path, monkeypatch):
|
||||
"""SessionStore with SQLite — load_transcript reads from DB only.
|
||||
|
||||
Pin DEFAULT_DB_PATH to tmp_path so SessionDB() can't write to the real
|
||||
~/.hermes/state.db. (DEFAULT_DB_PATH is a module-level constant computed
|
||||
at hermes_state import time, before pytest's HERMES_HOME monkeypatch
|
||||
fires — the autouse fixture's HERMES_HOME override doesn't help here.)
|
||||
"""
|
||||
import hermes_state
|
||||
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", tmp_path / "state.db")
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = None
|
||||
s._loaded = True
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
return s
|
||||
|
||||
|
||||
|
||||
@@ -172,33 +172,7 @@ class TestFlushDeduplication:
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestAppendToTranscriptSkipDb:
|
||||
"""Verify skip_db=True writes JSONL but not SQLite."""
|
||||
|
||||
@pytest.fixture()
|
||||
def store(self, tmp_path):
|
||||
from gateway.config import GatewayConfig
|
||||
from gateway.session import SessionStore
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = None # no SQLite for these JSONL-focused tests
|
||||
s._loaded = True
|
||||
return s
|
||||
|
||||
def test_skip_db_writes_jsonl_only(self, store, tmp_path):
|
||||
"""With skip_db=True, message appears in JSONL but not SQLite."""
|
||||
session_id = "test-skip-db"
|
||||
msg = {"role": "assistant", "content": "hello world"}
|
||||
store.append_to_transcript(session_id, msg, skip_db=True)
|
||||
|
||||
# JSONL should have the message
|
||||
jsonl_path = store.get_transcript_path(session_id)
|
||||
assert jsonl_path.exists()
|
||||
with open(jsonl_path) as f:
|
||||
lines = f.readlines()
|
||||
assert len(lines) == 1
|
||||
parsed = json.loads(lines[0])
|
||||
assert parsed["content"] == "hello world"
|
||||
"""Verify skip_db=True skips the SQLite write."""
|
||||
|
||||
def test_skip_db_prevents_sqlite_write(self, tmp_path):
|
||||
"""With skip_db=True and a real DB, message does NOT appear in SQLite."""
|
||||
@@ -225,14 +199,8 @@ class TestAppendToTranscriptSkipDb:
|
||||
rows = db.get_messages(session_id)
|
||||
assert len(rows) == 0, f"Expected 0 DB rows with skip_db=True, got {len(rows)}"
|
||||
|
||||
# But JSONL should have it
|
||||
jsonl_path = store.get_transcript_path(session_id)
|
||||
with open(jsonl_path) as f:
|
||||
lines = f.readlines()
|
||||
assert len(lines) == 1
|
||||
|
||||
def test_default_writes_both(self, tmp_path):
|
||||
"""Without skip_db, message appears in both JSONL and SQLite."""
|
||||
def test_default_writes_to_sqlite(self, tmp_path):
|
||||
"""Without skip_db, message appears in SQLite."""
|
||||
from gateway.config import GatewayConfig
|
||||
from gateway.session import SessionStore
|
||||
from hermes_state import SessionDB
|
||||
@@ -252,13 +220,7 @@ class TestAppendToTranscriptSkipDb:
|
||||
msg = {"role": "user", "content": "test message"}
|
||||
store.append_to_transcript(session_id, msg)
|
||||
|
||||
# JSONL should have the message
|
||||
jsonl_path = store.get_transcript_path(session_id)
|
||||
with open(jsonl_path) as f:
|
||||
lines = f.readlines()
|
||||
assert len(lines) == 1
|
||||
|
||||
# SQLite should also have the message
|
||||
# SQLite should have the message
|
||||
rows = db.get_messages(session_id)
|
||||
assert len(rows) == 1
|
||||
|
||||
|
||||
@@ -10,10 +10,9 @@ Hermes Agent automatically saves every conversation as a session. Sessions enabl
|
||||
|
||||
## How Sessions Work
|
||||
|
||||
Every conversation — whether from the CLI, Telegram, Discord, Slack, WhatsApp, Signal, Matrix, Teams, or any other messaging platform — is stored as a session with full message history. Sessions are tracked in two complementary systems:
|
||||
Every conversation — whether from the CLI, Telegram, Discord, Slack, WhatsApp, Signal, Matrix, Teams, or any other messaging platform — is stored as a session with full message history. Sessions are tracked in:
|
||||
|
||||
1. **SQLite database** (`~/.hermes/state.db`) — structured session metadata with FTS5 full-text search
|
||||
2. **JSONL transcripts** (`~/.hermes/sessions/`) — raw conversation transcripts including tool calls (gateway)
|
||||
1. **SQLite database** (`~/.hermes/state.db`) — structured session metadata with FTS5 full-text search, plus full message history
|
||||
|
||||
The SQLite database stores:
|
||||
- Session ID, source platform, user ID
|
||||
@@ -488,11 +487,18 @@ Sessions with **active background processes** are never auto-reset, regardless o
|
||||
| What | Path | Description |
|
||||
|------|------|-------------|
|
||||
| SQLite database | `~/.hermes/state.db` | All session metadata + messages with FTS5 |
|
||||
| Gateway transcripts | `~/.hermes/sessions/` | JSONL transcripts per session + sessions.json index |
|
||||
| Gateway index | `~/.hermes/sessions/sessions.json` | Maps session keys to active session IDs |
|
||||
| Gateway messages | `~/.hermes/state.db` | SQLite — canonical store for all session messages |
|
||||
| Gateway routing index | `~/.hermes/sessions/sessions.json` | Maps session keys to active session IDs (origin metadata, expiry flags) |
|
||||
|
||||
The SQLite database uses WAL mode for concurrent readers and a single writer, which suits the gateway's multi-platform architecture well.
|
||||
|
||||
:::note Legacy JSONL transcripts
|
||||
Sessions created before state.db became canonical may have leftover
|
||||
`*.jsonl` files in `~/.hermes/sessions/`. They are no longer written or
|
||||
read by Hermes. Safe to delete after verifying the corresponding session
|
||||
exists in state.db.
|
||||
:::
|
||||
|
||||
### Database Schema
|
||||
|
||||
Key tables in `state.db`:
|
||||
|
||||
Reference in New Issue
Block a user