Compare commits

...

1 Commits

Author SHA1 Message Date
Teknium
564081ca5e fix: SQLite WAL write-lock contention causing 15-20s TUI freeze
Multiple hermes processes (gateway + CLI sessions + worktree agents) sharing
one state.db caused WAL write-lock convoy effects. SQLite's built-in busy
handler uses deterministic sleep intervals (up to 100ms) that synchronize
competing writers, creating 15-20 second freezes during agent init.

Root cause: timeout=30.0 with 7+ concurrent connections meant:
- WAL never checkpointed (294MB, readers always blocked it)
- Bloated WAL slowed all reads and writes
- Deterministic backoff caused convoy effects under contention

Fix:
- Replace 30s SQLite timeout with 1s + app-level retry (15 attempts,
  random 20-150ms jitter between retries to break convoys)
- Use BEGIN IMMEDIATE for explicit write-lock acquisition (fail fast)
- Set isolation_level=None for manual transaction control
- PASSIVE WAL checkpoint on close() and every 50 writes
- All 12 write methods converted to _execute_write() helper

Before: 15-20s frozen at create_session during agent init
After:  <1s to API call, WAL stays at ~4MB

Tested: 4355 tests pass, 3 concurrent live sessions with simultaneous
writes showed zero contention on every py-spy sample.
2026-03-27 05:22:31 -07:00

View File

@@ -15,15 +15,20 @@ Key design decisions:
""" """
import json import json
import logging
import os import os
import random
import re import re
import sqlite3 import sqlite3
import threading import threading
import time import time
from pathlib import Path from pathlib import Path
from hermes_constants import get_hermes_home from hermes_constants import get_hermes_home
from typing import Dict, Any, List, Optional from typing import Any, Callable, Dict, List, Optional, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db" DEFAULT_DB_PATH = get_hermes_home() / "state.db"
@@ -116,18 +121,38 @@ class SessionDB:
single writer via WAL mode). Each method opens its own cursor. single writer via WAL mode). Each method opens its own cursor.
""" """
# ── Write-contention tuning ──
# With multiple hermes processes (gateway + CLI sessions + worktree agents)
# all sharing one state.db, WAL write-lock contention causes visible TUI
# freezes. SQLite's built-in busy handler uses a deterministic sleep
# schedule that causes convoy effects under high concurrency.
#
# Instead, we keep the SQLite timeout short (1s) and handle retries at the
# application level with random jitter, which naturally staggers competing
# writers and avoids the convoy.
_WRITE_MAX_RETRIES = 15
_WRITE_RETRY_MIN_S = 0.020 # 20ms
_WRITE_RETRY_MAX_S = 0.150 # 150ms
# Attempt a PASSIVE WAL checkpoint every N successful writes.
_CHECKPOINT_EVERY_N_WRITES = 50
def __init__(self, db_path: Path = None): def __init__(self, db_path: Path = None):
self.db_path = db_path or DEFAULT_DB_PATH self.db_path = db_path or DEFAULT_DB_PATH
self.db_path.parent.mkdir(parents=True, exist_ok=True) self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock() self._lock = threading.Lock()
self._write_count = 0
self._conn = sqlite3.connect( self._conn = sqlite3.connect(
str(self.db_path), str(self.db_path),
check_same_thread=False, check_same_thread=False,
# 30s gives the WAL writer (CLI or gateway) time to finish a batch # Short timeout — application-level retry with random jitter
# flush before the concurrent reader/writer gives up. 10s was too # handles contention instead of sitting in SQLite's internal
# short when the CLI is doing frequent memory flushes. # busy handler for up to 30s.
timeout=30.0, timeout=1.0,
# Autocommit mode: Python's default isolation_level="" auto-starts
# transactions on DML, which conflicts with our explicit
# BEGIN IMMEDIATE. None = we manage transactions ourselves.
isolation_level=None,
) )
self._conn.row_factory = sqlite3.Row self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA journal_mode=WAL")
@@ -135,6 +160,96 @@ class SessionDB:
self._init_schema() self._init_schema()
# ── Core write helper ──
def _execute_write(self, fn: Callable[[sqlite3.Connection], T]) -> T:
"""Execute a write transaction with BEGIN IMMEDIATE and jitter retry.
*fn* receives the connection and should perform INSERT/UPDATE/DELETE
statements. The caller must NOT call ``commit()`` — that's handled
here after *fn* returns.
BEGIN IMMEDIATE acquires the WAL write lock at transaction start
(not at commit time), so lock contention surfaces immediately.
On ``database is locked``, we release the Python lock, sleep a
random 20-150ms, and retry — breaking the convoy pattern that
SQLite's built-in deterministic backoff creates.
Returns whatever *fn* returns.
"""
last_err: Optional[Exception] = None
for attempt in range(self._WRITE_MAX_RETRIES):
try:
with self._lock:
self._conn.execute("BEGIN IMMEDIATE")
try:
result = fn(self._conn)
self._conn.commit()
except BaseException:
try:
self._conn.rollback()
except Exception:
pass
raise
# Success — periodic best-effort checkpoint.
self._write_count += 1
if self._write_count % self._CHECKPOINT_EVERY_N_WRITES == 0:
self._try_wal_checkpoint()
return result
except sqlite3.OperationalError as exc:
err_msg = str(exc).lower()
if "locked" in err_msg or "busy" in err_msg:
last_err = exc
if attempt < self._WRITE_MAX_RETRIES - 1:
jitter = random.uniform(
self._WRITE_RETRY_MIN_S,
self._WRITE_RETRY_MAX_S,
)
time.sleep(jitter)
continue
# Non-lock error or retries exhausted — propagate.
raise
# Retries exhausted (shouldn't normally reach here).
raise last_err or sqlite3.OperationalError(
"database is locked after max retries"
)
def _try_wal_checkpoint(self) -> None:
"""Best-effort PASSIVE WAL checkpoint. Never blocks, never raises.
Flushes committed WAL frames back into the main DB file for any
frames that no other connection currently needs. Keeps the WAL
from growing unbounded when many processes hold persistent
connections.
"""
try:
with self._lock:
result = self._conn.execute(
"PRAGMA wal_checkpoint(PASSIVE)"
).fetchone()
if result and result[1] > 0:
logger.debug(
"WAL checkpoint: %d/%d pages checkpointed",
result[2], result[1],
)
except Exception:
pass # Best effort — never fatal.
def close(self):
"""Close the database connection.
Attempts a PASSIVE WAL checkpoint first so that exiting processes
help keep the WAL file from growing unbounded.
"""
with self._lock:
if self._conn:
try:
self._conn.execute("PRAGMA wal_checkpoint(PASSIVE)")
except Exception:
pass
self._conn.close()
self._conn = None
def _init_schema(self): def _init_schema(self):
"""Create tables and FTS if they don't exist, run migrations.""" """Create tables and FTS if they don't exist, run migrations."""
cursor = self._conn.cursor() cursor = self._conn.cursor()
@@ -256,8 +371,8 @@ class SessionDB:
parent_session_id: str = None, parent_session_id: str = None,
) -> str: ) -> str:
"""Create a new session record. Returns the session_id.""" """Create a new session record. Returns the session_id."""
with self._lock: def _do(conn):
self._conn.execute( conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config, """INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, started_at) system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
@@ -272,35 +387,35 @@ class SessionDB:
time.time(), time.time(),
), ),
) )
self._conn.commit() self._execute_write(_do)
return session_id return session_id
def end_session(self, session_id: str, end_reason: str) -> None: def end_session(self, session_id: str, end_reason: str) -> None:
"""Mark a session as ended.""" """Mark a session as ended."""
with self._lock: def _do(conn):
self._conn.execute( conn.execute(
"UPDATE sessions SET ended_at = ?, end_reason = ? WHERE id = ?", "UPDATE sessions SET ended_at = ?, end_reason = ? WHERE id = ?",
(time.time(), end_reason, session_id), (time.time(), end_reason, session_id),
) )
self._conn.commit() self._execute_write(_do)
def reopen_session(self, session_id: str) -> None: def reopen_session(self, session_id: str) -> None:
"""Clear ended_at/end_reason so a session can be resumed.""" """Clear ended_at/end_reason so a session can be resumed."""
with self._lock: def _do(conn):
self._conn.execute( conn.execute(
"UPDATE sessions SET ended_at = NULL, end_reason = NULL WHERE id = ?", "UPDATE sessions SET ended_at = NULL, end_reason = NULL WHERE id = ?",
(session_id,), (session_id,),
) )
self._conn.commit() self._execute_write(_do)
def update_system_prompt(self, session_id: str, system_prompt: str) -> None: def update_system_prompt(self, session_id: str, system_prompt: str) -> None:
"""Store the full assembled system prompt snapshot.""" """Store the full assembled system prompt snapshot."""
with self._lock: def _do(conn):
self._conn.execute( conn.execute(
"UPDATE sessions SET system_prompt = ? WHERE id = ?", "UPDATE sessions SET system_prompt = ? WHERE id = ?",
(system_prompt, session_id), (system_prompt, session_id),
) )
self._conn.commit() self._execute_write(_do)
def update_token_counts( def update_token_counts(
self, self,
@@ -370,10 +485,7 @@ class SessionDB:
billing_mode = COALESCE(billing_mode, ?), billing_mode = COALESCE(billing_mode, ?),
model = COALESCE(model, ?) model = COALESCE(model, ?)
WHERE id = ?""" WHERE id = ?"""
with self._lock: params = (
self._conn.execute(
sql,
(
input_tokens, input_tokens,
output_tokens, output_tokens,
cache_read_tokens, cache_read_tokens,
@@ -390,9 +502,10 @@ class SessionDB:
billing_mode, billing_mode,
model, model,
session_id, session_id,
),
) )
self._conn.commit() def _do(conn):
conn.execute(sql, params)
self._execute_write(_do)
def ensure_session( def ensure_session(
self, self,
@@ -406,14 +519,14 @@ class SessionDB:
create_session() call (e.g. transient SQLite lock at agent startup). create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists. INSERT OR IGNORE is safe to call even when the row already exists.
""" """
with self._lock: def _do(conn):
self._conn.execute( conn.execute(
"""INSERT OR IGNORE INTO sessions """INSERT OR IGNORE INTO sessions
(id, source, model, started_at) (id, source, model, started_at)
VALUES (?, ?, ?, ?)""", VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()), (session_id, source, model, time.time()),
) )
self._conn.commit() self._execute_write(_do)
def set_token_counts( def set_token_counts(
self, self,
@@ -439,8 +552,8 @@ class SessionDB:
conversation run (e.g. the gateway, where the cached agent's conversation run (e.g. the gateway, where the cached agent's
session_prompt_tokens already reflects the running total). session_prompt_tokens already reflects the running total).
""" """
with self._lock: def _do(conn):
self._conn.execute( conn.execute(
"""UPDATE sessions SET """UPDATE sessions SET
input_tokens = ?, input_tokens = ?,
output_tokens = ?, output_tokens = ?,
@@ -479,7 +592,7 @@ class SessionDB:
session_id, session_id,
), ),
) )
self._conn.commit() self._execute_write(_do)
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get a session by ID.""" """Get a session by ID."""
@@ -573,10 +686,10 @@ class SessionDB:
Empty/whitespace-only strings are normalized to None (clearing the title). Empty/whitespace-only strings are normalized to None (clearing the title).
""" """
title = self.sanitize_title(title) title = self.sanitize_title(title)
with self._lock: def _do(conn):
if title: if title:
# Check uniqueness (allow the same session to keep its own title) # Check uniqueness (allow the same session to keep its own title)
cursor = self._conn.execute( cursor = conn.execute(
"SELECT id FROM sessions WHERE title = ? AND id != ?", "SELECT id FROM sessions WHERE title = ? AND id != ?",
(title, session_id), (title, session_id),
) )
@@ -585,12 +698,12 @@ class SessionDB:
raise ValueError( raise ValueError(
f"Title '{title}' is already in use by session {conflict['id']}" f"Title '{title}' is already in use by session {conflict['id']}"
) )
cursor = self._conn.execute( cursor = conn.execute(
"UPDATE sessions SET title = ? WHERE id = ?", "UPDATE sessions SET title = ? WHERE id = ?",
(title, session_id), (title, session_id),
) )
self._conn.commit() return cursor.rowcount
rowcount = cursor.rowcount rowcount = self._execute_write(_do)
return rowcount > 0 return rowcount > 0
def get_session_title(self, session_id: str) -> Optional[str]: def get_session_title(self, session_id: str) -> Optional[str]:
@@ -762,8 +875,7 @@ class SessionDB:
Also increments the session's message_count (and tool_call_count Also increments the session's message_count (and tool_call_count
if role is 'tool' or tool_calls is present). if role is 'tool' or tool_calls is present).
""" """
with self._lock: # Serialize structured fields to JSON before entering the write txn
# Serialize structured fields to JSON for storage
reasoning_details_json = ( reasoning_details_json = (
json.dumps(reasoning_details) json.dumps(reasoning_details)
if reasoning_details else None if reasoning_details else None
@@ -772,7 +884,15 @@ class SessionDB:
json.dumps(codex_reasoning_items) json.dumps(codex_reasoning_items)
if codex_reasoning_items else None if codex_reasoning_items else None
) )
cursor = self._conn.execute( tool_calls_json = json.dumps(tool_calls) if tool_calls else None
# Pre-compute tool call count
num_tool_calls = 0
if tool_calls is not None:
num_tool_calls = len(tool_calls) if isinstance(tool_calls, list) else 1
def _do(conn):
cursor = conn.execute(
"""INSERT INTO messages (session_id, role, content, tool_call_id, """INSERT INTO messages (session_id, role, content, tool_call_id,
tool_calls, tool_name, timestamp, token_count, finish_reason, tool_calls, tool_name, timestamp, token_count, finish_reason,
reasoning, reasoning_details, codex_reasoning_items) reasoning, reasoning_details, codex_reasoning_items)
@@ -782,7 +902,7 @@ class SessionDB:
role, role,
content, content,
tool_call_id, tool_call_id,
json.dumps(tool_calls) if tool_calls else None, tool_calls_json,
tool_name, tool_name,
time.time(), time.time(),
token_count, token_count,
@@ -795,26 +915,21 @@ class SessionDB:
msg_id = cursor.lastrowid msg_id = cursor.lastrowid
# Update counters # Update counters
# Count actual tool calls from the tool_calls list (not from tool responses).
# A single assistant message can contain multiple parallel tool calls.
num_tool_calls = 0
if tool_calls is not None:
num_tool_calls = len(tool_calls) if isinstance(tool_calls, list) else 1
if num_tool_calls > 0: if num_tool_calls > 0:
self._conn.execute( conn.execute(
"""UPDATE sessions SET message_count = message_count + 1, """UPDATE sessions SET message_count = message_count + 1,
tool_call_count = tool_call_count + ? WHERE id = ?""", tool_call_count = tool_call_count + ? WHERE id = ?""",
(num_tool_calls, session_id), (num_tool_calls, session_id),
) )
else: else:
self._conn.execute( conn.execute(
"UPDATE sessions SET message_count = message_count + 1 WHERE id = ?", "UPDATE sessions SET message_count = message_count + 1 WHERE id = ?",
(session_id,), (session_id,),
) )
self._conn.commit()
return msg_id return msg_id
return self._execute_write(_do)
def get_messages(self, session_id: str) -> List[Dict[str, Any]]: def get_messages(self, session_id: str) -> List[Dict[str, Any]]:
"""Load all messages for a session, ordered by timestamp.""" """Load all messages for a session, ordered by timestamp."""
with self._lock: with self._lock:
@@ -1107,54 +1222,53 @@ class SessionDB:
def clear_messages(self, session_id: str) -> None: def clear_messages(self, session_id: str) -> None:
"""Delete all messages for a session and reset its counters.""" """Delete all messages for a session and reset its counters."""
with self._lock: def _do(conn):
self._conn.execute( conn.execute(
"DELETE FROM messages WHERE session_id = ?", (session_id,) "DELETE FROM messages WHERE session_id = ?", (session_id,)
) )
self._conn.execute( conn.execute(
"UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?", "UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?",
(session_id,), (session_id,),
) )
self._conn.commit() self._execute_write(_do)
def delete_session(self, session_id: str) -> bool: def delete_session(self, session_id: str) -> bool:
"""Delete a session and all its messages. Returns True if found.""" """Delete a session and all its messages. Returns True if found."""
with self._lock: def _do(conn):
cursor = self._conn.execute( cursor = conn.execute(
"SELECT COUNT(*) FROM sessions WHERE id = ?", (session_id,) "SELECT COUNT(*) FROM sessions WHERE id = ?", (session_id,)
) )
if cursor.fetchone()[0] == 0: if cursor.fetchone()[0] == 0:
return False return False
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,)) conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
self._conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,)) conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
self._conn.commit()
return True return True
return self._execute_write(_do)
def prune_sessions(self, older_than_days: int = 90, source: str = None) -> int: def prune_sessions(self, older_than_days: int = 90, source: str = None) -> int:
""" """
Delete sessions older than N days. Returns count of deleted sessions. Delete sessions older than N days. Returns count of deleted sessions.
Only prunes ended sessions (not active ones). Only prunes ended sessions (not active ones).
""" """
import time as _time cutoff = time.time() - (older_than_days * 86400)
cutoff = _time.time() - (older_than_days * 86400)
with self._lock: def _do(conn):
if source: if source:
cursor = self._conn.execute( cursor = conn.execute(
"""SELECT id FROM sessions """SELECT id FROM sessions
WHERE started_at < ? AND ended_at IS NOT NULL AND source = ?""", WHERE started_at < ? AND ended_at IS NOT NULL AND source = ?""",
(cutoff, source), (cutoff, source),
) )
else: else:
cursor = self._conn.execute( cursor = conn.execute(
"SELECT id FROM sessions WHERE started_at < ? AND ended_at IS NOT NULL", "SELECT id FROM sessions WHERE started_at < ? AND ended_at IS NOT NULL",
(cutoff,), (cutoff,),
) )
session_ids = [row["id"] for row in cursor.fetchall()] session_ids = [row["id"] for row in cursor.fetchall()]
for sid in session_ids: for sid in session_ids:
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,)) conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
self._conn.execute("DELETE FROM sessions WHERE id = ?", (sid,)) conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
self._conn.commit()
return len(session_ids) return len(session_ids)
return self._execute_write(_do)