Compare commits

...

1 Commits

Author SHA1 Message Date
Teknium
14a5e56f6f refactor: consolidate gateway session metadata into state.db
Move gateway routing metadata (session_key, platform, chat_type, origin,
display_name, memory_flushed) from sessions.json into the sessions table
in state.db. This eliminates the dual-file dependency that caused the
mcp_serve polling bug (#8925) and makes state.db the single source of
truth for session discovery.

Changes:
- Schema v7 migration: add 6 new columns to sessions table with backfill
  from existing sessions.json during migration
- New SessionDB methods: set_gateway_metadata(), list_gateway_sessions(),
  find_session_by_origin(), set_memory_flushed()
- Gateway session.py: write routing metadata to state.db on every
  session create/reset/switch
- Rewire all consumers (mcp_serve, mirror, channel_directory, status)
  to query state.db first with sessions.json fallback for pre-migration
  databases
- mcp_serve _poll_once: simplified to watch only state.db mtime (fixes
  the split-mtime bug from #8925 as a side effect)

sessions.json continues to be written by the gateway for now but is no
longer read as the primary source by any consumer. Can be made optional
in a future PR.
2026-04-13 05:07:35 -07:00
8 changed files with 347 additions and 28 deletions

View File

@@ -145,12 +145,49 @@ def _build_slack(adapter) -> List[Dict[str, str]]:
def _build_from_sessions(platform_name: str) -> List[Dict[str, str]]:
"""Pull known channels/contacts from sessions.json origin data."""
"""Pull known channels/contacts from state.db gateway metadata.
Falls back to sessions.json for pre-migration databases.
"""
entries = []
# Primary: query state.db
try:
from hermes_state import SessionDB
db = SessionDB()
try:
rows = db.list_gateway_sessions(platform=platform_name)
finally:
db.close()
if rows:
seen_ids = set()
for row in rows:
origin_json = row.get("origin_json")
if not origin_json:
continue
try:
origin = json.loads(origin_json)
except (json.JSONDecodeError, TypeError):
continue
entry_id = _session_entry_id(origin)
if not entry_id or entry_id in seen_ids:
continue
seen_ids.add(entry_id)
entries.append({
"id": entry_id,
"name": _session_entry_name(origin),
"type": row.get("chat_type", "dm"),
"thread_id": origin.get("thread_id"),
})
return entries
except Exception as e:
logger.debug("Channel directory: state.db lookup failed, falling back: %s", e)
# Fallback: read sessions.json
sessions_path = get_hermes_home() / "sessions" / "sessions.json"
if not sessions_path.exists():
return []
entries = []
try:
with open(sessions_path, encoding="utf-8") as f:
data = json.load(f)

View File

@@ -67,10 +67,23 @@ def _find_session_id(platform: str, chat_id: str, thread_id: Optional[str] = Non
"""
Find the active session_id for a platform + chat_id pair.
Scans sessions.json entries and matches where origin.chat_id == chat_id
on the right platform. DM session keys don't embed the chat_id
(e.g. "agent:main:telegram:dm"), so we check the origin dict.
Queries state.db for matching sessions. Falls back to sessions.json
for pre-migration databases.
"""
# Primary: query state.db
try:
from hermes_state import SessionDB
db = SessionDB()
try:
row = db.find_session_by_origin(platform, chat_id, thread_id=thread_id)
finally:
db.close()
if row:
return row.get("id")
except Exception as e:
logger.debug("Mirror: state.db lookup failed, falling back to sessions.json: %s", e)
# Fallback: read sessions.json
if not _SESSIONS_INDEX.exists():
return None

View File

@@ -1819,6 +1819,11 @@ class GatewayRunner:
with self.session_store._lock:
entry.memory_flushed = True
self.session_store._save()
if self._session_db:
try:
self._session_db.set_memory_flushed(entry.session_id)
except Exception:
pass
logger.debug(
"Memory flush completed for session %s",
entry.session_id,
@@ -1836,6 +1841,11 @@ class GatewayRunner:
with self.session_store._lock:
entry.memory_flushed = True
self.session_store._save()
if self._session_db:
try:
self._session_db.set_memory_flushed(entry.session_id)
except Exception:
pass
_flush_failures.pop(entry.session_id, None)
else:
logger.debug(

View File

@@ -764,6 +764,19 @@ class SessionStore:
self._db.create_session(**db_create_kwargs)
except Exception as e:
print(f"[gateway] Warning: Failed to create SQLite session: {e}")
# Write gateway routing metadata to state.db so it can serve
# as the single source of truth (replacing sessions.json reads).
try:
self._db.set_gateway_metadata(
session_id=entry.session_id,
session_key=entry.session_key,
platform=entry.platform.value if entry.platform else None,
chat_type=entry.chat_type,
origin_json=json.dumps(entry.origin.to_dict()) if entry.origin else None,
display_name=entry.display_name,
)
except Exception as e:
logger.debug("Failed to write gateway metadata to state.db: %s", e)
return entry
@@ -869,6 +882,17 @@ class SessionStore:
self._db.create_session(**db_create_kwargs)
except Exception as e:
logger.debug("Session DB operation failed: %s", e)
try:
self._db.set_gateway_metadata(
session_id=new_entry.session_id,
session_key=new_entry.session_key,
platform=new_entry.platform.value if new_entry.platform else None,
chat_type=new_entry.chat_type,
origin_json=json.dumps(new_entry.origin.to_dict()) if new_entry.origin else None,
display_name=new_entry.display_name,
)
except Exception as e:
logger.debug("Failed to write gateway metadata to state.db: %s", e)
return new_entry
@@ -918,6 +942,20 @@ class SessionStore:
except Exception as e:
logger.debug("Session DB end_session failed: %s", e)
# Update gateway metadata on the target session
if self._db and new_entry:
try:
self._db.set_gateway_metadata(
session_id=new_entry.session_id,
session_key=new_entry.session_key,
platform=new_entry.platform.value if new_entry.platform else None,
chat_type=new_entry.chat_type,
origin_json=json.dumps(new_entry.origin.to_dict()) if new_entry.origin else None,
display_name=new_entry.display_name,
)
except Exception as e:
logger.debug("Failed to write gateway metadata to state.db: %s", e)
return new_entry
def list_sessions(self, active_minutes: Optional[int] = None) -> List[SessionEntry]:

View File

@@ -421,7 +421,21 @@ def show_status(args):
print(color("◆ Sessions", Colors.CYAN, Colors.BOLD))
sessions_file = get_hermes_home() / "sessions" / "sessions.json"
if sessions_file.exists():
# Primary: count gateway sessions from state.db
_session_count_shown = False
try:
from hermes_state import SessionDB
_sdb = SessionDB()
try:
_gw_sessions = _sdb.list_gateway_sessions()
finally:
_sdb.close()
print(f" Active: {len(_gw_sessions)} session(s)")
_session_count_shown = True
except Exception:
pass
# Fallback: sessions.json
if not _session_count_shown and sessions_file.exists():
import json
try:
with open(sessions_file, encoding="utf-8") as f:

View File

@@ -31,7 +31,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 6
SCHEMA_VERSION = 7
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -65,6 +65,12 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
session_key TEXT,
platform TEXT,
chat_type TEXT,
origin_json TEXT,
display_name TEXT,
memory_flushed INTEGER DEFAULT 0,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -329,6 +335,36 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add gateway routing metadata columns to sessions table.
# These columns allow state.db to serve as the single source of
# truth for session discovery, replacing sessions.json reads.
for col_name, col_type in [
("session_key", "TEXT"),
("platform", "TEXT"),
("chat_type", "TEXT"),
("origin_json", "TEXT"),
("display_name", "TEXT"),
("memory_flushed", "INTEGER DEFAULT 0"),
]:
try:
safe = col_name.replace('"', '""')
cursor.execute(
f'ALTER TABLE sessions ADD COLUMN "{safe}" {col_type}'
)
except sqlite3.OperationalError:
pass # Column already exists
# Create index on session_key for fast lookups
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_session_key "
"ON sessions(session_key)"
)
except sqlite3.OperationalError:
pass
# Backfill from sessions.json if it exists
self._backfill_gateway_metadata_v7(cursor)
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -340,6 +376,15 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Index already exists
# session_key index for gateway metadata lookups
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_session_key "
"ON sessions(session_key)"
)
except sqlite3.OperationalError:
pass
# FTS5 setup (separate because CREATE VIRTUAL TABLE can't be in executescript with IF NOT EXISTS reliably)
try:
cursor.execute("SELECT * FROM messages_fts LIMIT 0")
@@ -348,6 +393,37 @@ class SessionDB:
self._conn.commit()
def _backfill_gateway_metadata_v7(self, cursor):
"""Backfill gateway routing metadata from sessions.json during v7 migration."""
try:
sessions_dir = get_hermes_home() / "sessions"
sessions_file = sessions_dir / "sessions.json"
if not sessions_file.exists():
return
with open(sessions_file, "r", encoding="utf-8") as f:
data = json.load(f)
for _key, entry in data.items():
session_id = entry.get("session_id", "")
if not session_id:
continue
session_key = entry.get("session_key", _key)
platform = entry.get("platform", "")
chat_type = entry.get("chat_type", "dm")
display_name = entry.get("display_name")
origin = entry.get("origin")
origin_json = json.dumps(origin) if origin else None
memory_flushed = 1 if entry.get("memory_flushed") else 0
cursor.execute(
"""UPDATE sessions SET
session_key = ?, platform = ?, chat_type = ?,
origin_json = ?, display_name = ?, memory_flushed = ?
WHERE id = ? AND session_key IS NULL""",
(session_key, platform, chat_type, origin_json,
display_name, memory_flushed, session_id),
)
except Exception as e:
logger.debug("v7 backfill from sessions.json failed (non-fatal): %s", e)
# =========================================================================
# Session lifecycle
# =========================================================================
@@ -382,6 +458,112 @@ class SessionDB:
self._execute_write(_do)
return session_id
def set_gateway_metadata(
self,
session_id: str,
session_key: str = None,
platform: str = None,
chat_type: str = None,
origin_json: str = None,
display_name: str = None,
) -> None:
"""Write gateway routing metadata for a session.
Called by the gateway after creating or resuming a session so that
state.db becomes the single source of truth for session discovery.
Uses UPDATE (not UPSERT) — the session row must already exist.
"""
sets = []
params = []
if session_key is not None:
sets.append("session_key = ?")
params.append(session_key)
if platform is not None:
sets.append("platform = ?")
params.append(platform)
if chat_type is not None:
sets.append("chat_type = ?")
params.append(chat_type)
if origin_json is not None:
sets.append("origin_json = ?")
params.append(origin_json)
if display_name is not None:
sets.append("display_name = ?")
params.append(display_name)
if not sets:
return
params.append(session_id)
sql = f"UPDATE sessions SET {', '.join(sets)} WHERE id = ?"
def _do(conn):
conn.execute(sql, params)
self._execute_write(_do)
def set_memory_flushed(self, session_id: str, flushed: bool = True) -> None:
"""Mark a session as having its memory flushed."""
def _do(conn):
conn.execute(
"UPDATE sessions SET memory_flushed = ? WHERE id = ?",
(1 if flushed else 0, session_id),
)
self._execute_write(_do)
def list_gateway_sessions(self, platform: str = None) -> List[Dict[str, Any]]:
"""List sessions that have gateway routing metadata.
Returns dicts with: id, session_key, platform, chat_type,
origin_json, display_name, source, started_at, ended_at, title,
message_count, memory_flushed.
When ``platform`` is given, only sessions for that platform are returned.
Only sessions with a non-NULL session_key are included (i.e. sessions
that were created through the gateway, not bare CLI sessions).
"""
where = "WHERE session_key IS NOT NULL"
params = []
if platform:
where += " AND platform = ?"
params.append(platform)
with self._lock:
rows = self._conn.execute(
f"""SELECT id, session_key, platform, chat_type, origin_json,
display_name, source, started_at, ended_at, title,
message_count, memory_flushed
FROM sessions {where}
ORDER BY started_at DESC""",
params,
).fetchall()
return [dict(r) for r in rows]
def find_session_by_origin(
self, platform: str, chat_id: str, thread_id: str = None,
) -> Optional[Dict[str, Any]]:
"""Find the most recent session for a platform + chat_id pair.
Searches the origin_json column for matching chat_id. When
``thread_id`` is given, also matches on thread_id. Returns the
session dict or None.
"""
# Use JSON extraction for matching. SQLite json_extract is
# available in all modern builds (3.9+).
sql = """
SELECT id, session_key, platform, chat_type, origin_json,
display_name, source, started_at, ended_at, title,
memory_flushed
FROM sessions
WHERE platform = ?
AND json_extract(origin_json, '$.chat_id') = ?
AND session_key IS NOT NULL
"""
params: list = [platform, str(chat_id)]
if thread_id is not None:
sql += " AND json_extract(origin_json, '$.thread_id') = ?"
params.append(str(thread_id))
sql += " ORDER BY started_at DESC LIMIT 1"
with self._lock:
row = self._conn.execute(sql, params).fetchone()
return dict(row) if row else None
def end_session(self, session_id: str, end_reason: str) -> None:
"""Mark a session as ended."""
def _do(conn):

View File

@@ -79,11 +79,45 @@ def _get_session_db():
def _load_sessions_index() -> dict:
"""Load the gateway sessions.json index directly.
"""Load gateway session metadata from state.db.
Returns a dict of session_key -> entry_dict with platform routing info.
This avoids importing the full SessionStore which needs GatewayConfig.
Falls back to reading sessions.json when state.db has no gateway metadata
(pre-migration databases).
"""
try:
from hermes_state import SessionDB
db = SessionDB()
try:
rows = db.list_gateway_sessions()
finally:
db.close()
if rows:
result = {}
for row in rows:
sk = row.get("session_key")
if not sk:
continue
entry = {
"session_key": sk,
"session_id": row.get("id", ""),
"platform": row.get("platform", ""),
"chat_type": row.get("chat_type", "dm"),
"display_name": row.get("display_name"),
"memory_flushed": bool(row.get("memory_flushed", 0)),
}
origin_json = row.get("origin_json")
if origin_json:
try:
entry["origin"] = json.loads(origin_json)
except (json.JSONDecodeError, TypeError):
pass
result[sk] = entry
return result
except Exception as e:
logger.debug("Failed to load sessions from state.db: %s", e)
# Fallback: read sessions.json for pre-migration databases
sessions_file = _get_sessions_dir() / "sessions.json"
if not sessions_file.exists():
return {}
@@ -200,8 +234,7 @@ class EventBridge:
self._last_poll_timestamps: Dict[str, float] = {} # session_key -> unix timestamp
# In-memory approval tracking (populated from events)
self._pending_approvals: Dict[str, dict] = {}
# mtime cache — skip expensive work when files haven't changed
self._sessions_json_mtime: float = 0.0
# mtime cache — skip expensive work when state.db hasn't changed
self._state_db_mtime: float = 0.0
self._cached_sessions_index: dict = {}
@@ -327,21 +360,10 @@ class EventBridge:
def _poll_once(self, db):
"""Check for new messages across all sessions.
Uses mtime checks on sessions.json and state.db to skip work
when nothing has changed — makes 200ms polling essentially free.
Uses mtime check on state.db to skip work when nothing has changed
— makes 200ms polling essentially free.
"""
# Check if sessions.json has changed (mtime check is ~1μs)
sessions_file = _get_sessions_dir() / "sessions.json"
try:
sj_mtime = sessions_file.stat().st_mtime if sessions_file.exists() else 0.0
except OSError:
sj_mtime = 0.0
if sj_mtime != self._sessions_json_mtime:
self._sessions_json_mtime = sj_mtime
self._cached_sessions_index = _load_sessions_index()
# Check if state.db has changed
# Check if state.db has changed (mtime check is ~1μs)
try:
from hermes_constants import get_hermes_home
db_file = get_hermes_home() / "state.db"
@@ -353,10 +375,13 @@ class EventBridge:
except OSError:
db_mtime = 0.0
if db_mtime == self._state_db_mtime and sj_mtime == self._sessions_json_mtime:
if db_mtime == self._state_db_mtime:
return # Nothing changed since last poll — skip entirely
self._state_db_mtime = db_mtime
# Reload the session index from state.db on every change since
# new sessions may have been created.
self._cached_sessions_index = _load_sessions_index()
entries = self._cached_sessions_index
for session_key, entry in entries.items():

View File

@@ -935,7 +935,7 @@ class TestSchemaInit:
def test_schema_version(self, db):
cursor = db._conn.execute("SELECT version FROM schema_version")
version = cursor.fetchone()[0]
assert version == 6
assert version == 7
def test_title_column_exists(self, db):
"""Verify the title column was created in the sessions table."""
@@ -996,7 +996,7 @@ class TestSchemaInit:
# Verify migration
cursor = migrated_db._conn.execute("SELECT version FROM schema_version")
assert cursor.fetchone()[0] == 6
assert cursor.fetchone()[0] == 7
# Verify title column exists and is NULL for existing sessions
session = migrated_db.get_session("existing")