mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-01 16:31:56 +08:00
Compare commits
1 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
564081ca5e |
294
hermes_state.py
294
hermes_state.py
@@ -15,15 +15,20 @@ Key design decisions:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import random
|
||||||
import re
|
import re
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from hermes_constants import get_hermes_home
|
from hermes_constants import get_hermes_home
|
||||||
from typing import Dict, Any, List, Optional
|
from typing import Any, Callable, Dict, List, Optional, TypeVar
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
||||||
|
|
||||||
@@ -116,18 +121,38 @@ class SessionDB:
|
|||||||
single writer via WAL mode). Each method opens its own cursor.
|
single writer via WAL mode). Each method opens its own cursor.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# ── Write-contention tuning ──
|
||||||
|
# With multiple hermes processes (gateway + CLI sessions + worktree agents)
|
||||||
|
# all sharing one state.db, WAL write-lock contention causes visible TUI
|
||||||
|
# freezes. SQLite's built-in busy handler uses a deterministic sleep
|
||||||
|
# schedule that causes convoy effects under high concurrency.
|
||||||
|
#
|
||||||
|
# Instead, we keep the SQLite timeout short (1s) and handle retries at the
|
||||||
|
# application level with random jitter, which naturally staggers competing
|
||||||
|
# writers and avoids the convoy.
|
||||||
|
_WRITE_MAX_RETRIES = 15
|
||||||
|
_WRITE_RETRY_MIN_S = 0.020 # 20ms
|
||||||
|
_WRITE_RETRY_MAX_S = 0.150 # 150ms
|
||||||
|
# Attempt a PASSIVE WAL checkpoint every N successful writes.
|
||||||
|
_CHECKPOINT_EVERY_N_WRITES = 50
|
||||||
|
|
||||||
def __init__(self, db_path: Path = None):
|
def __init__(self, db_path: Path = None):
|
||||||
self.db_path = db_path or DEFAULT_DB_PATH
|
self.db_path = db_path or DEFAULT_DB_PATH
|
||||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
|
self._write_count = 0
|
||||||
self._conn = sqlite3.connect(
|
self._conn = sqlite3.connect(
|
||||||
str(self.db_path),
|
str(self.db_path),
|
||||||
check_same_thread=False,
|
check_same_thread=False,
|
||||||
# 30s gives the WAL writer (CLI or gateway) time to finish a batch
|
# Short timeout — application-level retry with random jitter
|
||||||
# flush before the concurrent reader/writer gives up. 10s was too
|
# handles contention instead of sitting in SQLite's internal
|
||||||
# short when the CLI is doing frequent memory flushes.
|
# busy handler for up to 30s.
|
||||||
timeout=30.0,
|
timeout=1.0,
|
||||||
|
# Autocommit mode: Python's default isolation_level="" auto-starts
|
||||||
|
# transactions on DML, which conflicts with our explicit
|
||||||
|
# BEGIN IMMEDIATE. None = we manage transactions ourselves.
|
||||||
|
isolation_level=None,
|
||||||
)
|
)
|
||||||
self._conn.row_factory = sqlite3.Row
|
self._conn.row_factory = sqlite3.Row
|
||||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||||
@@ -135,6 +160,96 @@ class SessionDB:
|
|||||||
|
|
||||||
self._init_schema()
|
self._init_schema()
|
||||||
|
|
||||||
|
# ── Core write helper ──
|
||||||
|
|
||||||
|
def _execute_write(self, fn: Callable[[sqlite3.Connection], T]) -> T:
|
||||||
|
"""Execute a write transaction with BEGIN IMMEDIATE and jitter retry.
|
||||||
|
|
||||||
|
*fn* receives the connection and should perform INSERT/UPDATE/DELETE
|
||||||
|
statements. The caller must NOT call ``commit()`` — that's handled
|
||||||
|
here after *fn* returns.
|
||||||
|
|
||||||
|
BEGIN IMMEDIATE acquires the WAL write lock at transaction start
|
||||||
|
(not at commit time), so lock contention surfaces immediately.
|
||||||
|
On ``database is locked``, we release the Python lock, sleep a
|
||||||
|
random 20-150ms, and retry — breaking the convoy pattern that
|
||||||
|
SQLite's built-in deterministic backoff creates.
|
||||||
|
|
||||||
|
Returns whatever *fn* returns.
|
||||||
|
"""
|
||||||
|
last_err: Optional[Exception] = None
|
||||||
|
for attempt in range(self._WRITE_MAX_RETRIES):
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
self._conn.execute("BEGIN IMMEDIATE")
|
||||||
|
try:
|
||||||
|
result = fn(self._conn)
|
||||||
|
self._conn.commit()
|
||||||
|
except BaseException:
|
||||||
|
try:
|
||||||
|
self._conn.rollback()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
# Success — periodic best-effort checkpoint.
|
||||||
|
self._write_count += 1
|
||||||
|
if self._write_count % self._CHECKPOINT_EVERY_N_WRITES == 0:
|
||||||
|
self._try_wal_checkpoint()
|
||||||
|
return result
|
||||||
|
except sqlite3.OperationalError as exc:
|
||||||
|
err_msg = str(exc).lower()
|
||||||
|
if "locked" in err_msg or "busy" in err_msg:
|
||||||
|
last_err = exc
|
||||||
|
if attempt < self._WRITE_MAX_RETRIES - 1:
|
||||||
|
jitter = random.uniform(
|
||||||
|
self._WRITE_RETRY_MIN_S,
|
||||||
|
self._WRITE_RETRY_MAX_S,
|
||||||
|
)
|
||||||
|
time.sleep(jitter)
|
||||||
|
continue
|
||||||
|
# Non-lock error or retries exhausted — propagate.
|
||||||
|
raise
|
||||||
|
# Retries exhausted (shouldn't normally reach here).
|
||||||
|
raise last_err or sqlite3.OperationalError(
|
||||||
|
"database is locked after max retries"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _try_wal_checkpoint(self) -> None:
|
||||||
|
"""Best-effort PASSIVE WAL checkpoint. Never blocks, never raises.
|
||||||
|
|
||||||
|
Flushes committed WAL frames back into the main DB file for any
|
||||||
|
frames that no other connection currently needs. Keeps the WAL
|
||||||
|
from growing unbounded when many processes hold persistent
|
||||||
|
connections.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
result = self._conn.execute(
|
||||||
|
"PRAGMA wal_checkpoint(PASSIVE)"
|
||||||
|
).fetchone()
|
||||||
|
if result and result[1] > 0:
|
||||||
|
logger.debug(
|
||||||
|
"WAL checkpoint: %d/%d pages checkpointed",
|
||||||
|
result[2], result[1],
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass # Best effort — never fatal.
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Close the database connection.
|
||||||
|
|
||||||
|
Attempts a PASSIVE WAL checkpoint first so that exiting processes
|
||||||
|
help keep the WAL file from growing unbounded.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
if self._conn:
|
||||||
|
try:
|
||||||
|
self._conn.execute("PRAGMA wal_checkpoint(PASSIVE)")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._conn.close()
|
||||||
|
self._conn = None
|
||||||
|
|
||||||
def _init_schema(self):
|
def _init_schema(self):
|
||||||
"""Create tables and FTS if they don't exist, run migrations."""
|
"""Create tables and FTS if they don't exist, run migrations."""
|
||||||
cursor = self._conn.cursor()
|
cursor = self._conn.cursor()
|
||||||
@@ -256,8 +371,8 @@ class SessionDB:
|
|||||||
parent_session_id: str = None,
|
parent_session_id: str = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Create a new session record. Returns the session_id."""
|
"""Create a new session record. Returns the session_id."""
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
|
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
|
||||||
system_prompt, parent_session_id, started_at)
|
system_prompt, parent_session_id, started_at)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||||
@@ -272,35 +387,35 @@ class SessionDB:
|
|||||||
time.time(),
|
time.time(),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
self._conn.commit()
|
self._execute_write(_do)
|
||||||
return session_id
|
return session_id
|
||||||
|
|
||||||
def end_session(self, session_id: str, end_reason: str) -> None:
|
def end_session(self, session_id: str, end_reason: str) -> None:
|
||||||
"""Mark a session as ended."""
|
"""Mark a session as ended."""
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"UPDATE sessions SET ended_at = ?, end_reason = ? WHERE id = ?",
|
"UPDATE sessions SET ended_at = ?, end_reason = ? WHERE id = ?",
|
||||||
(time.time(), end_reason, session_id),
|
(time.time(), end_reason, session_id),
|
||||||
)
|
)
|
||||||
self._conn.commit()
|
self._execute_write(_do)
|
||||||
|
|
||||||
def reopen_session(self, session_id: str) -> None:
|
def reopen_session(self, session_id: str) -> None:
|
||||||
"""Clear ended_at/end_reason so a session can be resumed."""
|
"""Clear ended_at/end_reason so a session can be resumed."""
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"UPDATE sessions SET ended_at = NULL, end_reason = NULL WHERE id = ?",
|
"UPDATE sessions SET ended_at = NULL, end_reason = NULL WHERE id = ?",
|
||||||
(session_id,),
|
(session_id,),
|
||||||
)
|
)
|
||||||
self._conn.commit()
|
self._execute_write(_do)
|
||||||
|
|
||||||
def update_system_prompt(self, session_id: str, system_prompt: str) -> None:
|
def update_system_prompt(self, session_id: str, system_prompt: str) -> None:
|
||||||
"""Store the full assembled system prompt snapshot."""
|
"""Store the full assembled system prompt snapshot."""
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"UPDATE sessions SET system_prompt = ? WHERE id = ?",
|
"UPDATE sessions SET system_prompt = ? WHERE id = ?",
|
||||||
(system_prompt, session_id),
|
(system_prompt, session_id),
|
||||||
)
|
)
|
||||||
self._conn.commit()
|
self._execute_write(_do)
|
||||||
|
|
||||||
def update_token_counts(
|
def update_token_counts(
|
||||||
self,
|
self,
|
||||||
@@ -370,29 +485,27 @@ class SessionDB:
|
|||||||
billing_mode = COALESCE(billing_mode, ?),
|
billing_mode = COALESCE(billing_mode, ?),
|
||||||
model = COALESCE(model, ?)
|
model = COALESCE(model, ?)
|
||||||
WHERE id = ?"""
|
WHERE id = ?"""
|
||||||
with self._lock:
|
params = (
|
||||||
self._conn.execute(
|
input_tokens,
|
||||||
sql,
|
output_tokens,
|
||||||
(
|
cache_read_tokens,
|
||||||
input_tokens,
|
cache_write_tokens,
|
||||||
output_tokens,
|
reasoning_tokens,
|
||||||
cache_read_tokens,
|
estimated_cost_usd,
|
||||||
cache_write_tokens,
|
actual_cost_usd,
|
||||||
reasoning_tokens,
|
actual_cost_usd,
|
||||||
estimated_cost_usd,
|
cost_status,
|
||||||
actual_cost_usd,
|
cost_source,
|
||||||
actual_cost_usd,
|
pricing_version,
|
||||||
cost_status,
|
billing_provider,
|
||||||
cost_source,
|
billing_base_url,
|
||||||
pricing_version,
|
billing_mode,
|
||||||
billing_provider,
|
model,
|
||||||
billing_base_url,
|
session_id,
|
||||||
billing_mode,
|
)
|
||||||
model,
|
def _do(conn):
|
||||||
session_id,
|
conn.execute(sql, params)
|
||||||
),
|
self._execute_write(_do)
|
||||||
)
|
|
||||||
self._conn.commit()
|
|
||||||
|
|
||||||
def ensure_session(
|
def ensure_session(
|
||||||
self,
|
self,
|
||||||
@@ -406,14 +519,14 @@ class SessionDB:
|
|||||||
create_session() call (e.g. transient SQLite lock at agent startup).
|
create_session() call (e.g. transient SQLite lock at agent startup).
|
||||||
INSERT OR IGNORE is safe to call even when the row already exists.
|
INSERT OR IGNORE is safe to call even when the row already exists.
|
||||||
"""
|
"""
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"""INSERT OR IGNORE INTO sessions
|
"""INSERT OR IGNORE INTO sessions
|
||||||
(id, source, model, started_at)
|
(id, source, model, started_at)
|
||||||
VALUES (?, ?, ?, ?)""",
|
VALUES (?, ?, ?, ?)""",
|
||||||
(session_id, source, model, time.time()),
|
(session_id, source, model, time.time()),
|
||||||
)
|
)
|
||||||
self._conn.commit()
|
self._execute_write(_do)
|
||||||
|
|
||||||
def set_token_counts(
|
def set_token_counts(
|
||||||
self,
|
self,
|
||||||
@@ -439,8 +552,8 @@ class SessionDB:
|
|||||||
conversation run (e.g. the gateway, where the cached agent's
|
conversation run (e.g. the gateway, where the cached agent's
|
||||||
session_prompt_tokens already reflects the running total).
|
session_prompt_tokens already reflects the running total).
|
||||||
"""
|
"""
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"""UPDATE sessions SET
|
"""UPDATE sessions SET
|
||||||
input_tokens = ?,
|
input_tokens = ?,
|
||||||
output_tokens = ?,
|
output_tokens = ?,
|
||||||
@@ -479,7 +592,7 @@ class SessionDB:
|
|||||||
session_id,
|
session_id,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
self._conn.commit()
|
self._execute_write(_do)
|
||||||
|
|
||||||
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
||||||
"""Get a session by ID."""
|
"""Get a session by ID."""
|
||||||
@@ -573,10 +686,10 @@ class SessionDB:
|
|||||||
Empty/whitespace-only strings are normalized to None (clearing the title).
|
Empty/whitespace-only strings are normalized to None (clearing the title).
|
||||||
"""
|
"""
|
||||||
title = self.sanitize_title(title)
|
title = self.sanitize_title(title)
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
if title:
|
if title:
|
||||||
# Check uniqueness (allow the same session to keep its own title)
|
# Check uniqueness (allow the same session to keep its own title)
|
||||||
cursor = self._conn.execute(
|
cursor = conn.execute(
|
||||||
"SELECT id FROM sessions WHERE title = ? AND id != ?",
|
"SELECT id FROM sessions WHERE title = ? AND id != ?",
|
||||||
(title, session_id),
|
(title, session_id),
|
||||||
)
|
)
|
||||||
@@ -585,12 +698,12 @@ class SessionDB:
|
|||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Title '{title}' is already in use by session {conflict['id']}"
|
f"Title '{title}' is already in use by session {conflict['id']}"
|
||||||
)
|
)
|
||||||
cursor = self._conn.execute(
|
cursor = conn.execute(
|
||||||
"UPDATE sessions SET title = ? WHERE id = ?",
|
"UPDATE sessions SET title = ? WHERE id = ?",
|
||||||
(title, session_id),
|
(title, session_id),
|
||||||
)
|
)
|
||||||
self._conn.commit()
|
return cursor.rowcount
|
||||||
rowcount = cursor.rowcount
|
rowcount = self._execute_write(_do)
|
||||||
return rowcount > 0
|
return rowcount > 0
|
||||||
|
|
||||||
def get_session_title(self, session_id: str) -> Optional[str]:
|
def get_session_title(self, session_id: str) -> Optional[str]:
|
||||||
@@ -762,17 +875,24 @@ class SessionDB:
|
|||||||
Also increments the session's message_count (and tool_call_count
|
Also increments the session's message_count (and tool_call_count
|
||||||
if role is 'tool' or tool_calls is present).
|
if role is 'tool' or tool_calls is present).
|
||||||
"""
|
"""
|
||||||
with self._lock:
|
# Serialize structured fields to JSON before entering the write txn
|
||||||
# Serialize structured fields to JSON for storage
|
reasoning_details_json = (
|
||||||
reasoning_details_json = (
|
json.dumps(reasoning_details)
|
||||||
json.dumps(reasoning_details)
|
if reasoning_details else None
|
||||||
if reasoning_details else None
|
)
|
||||||
)
|
codex_items_json = (
|
||||||
codex_items_json = (
|
json.dumps(codex_reasoning_items)
|
||||||
json.dumps(codex_reasoning_items)
|
if codex_reasoning_items else None
|
||||||
if codex_reasoning_items else None
|
)
|
||||||
)
|
tool_calls_json = json.dumps(tool_calls) if tool_calls else None
|
||||||
cursor = self._conn.execute(
|
|
||||||
|
# Pre-compute tool call count
|
||||||
|
num_tool_calls = 0
|
||||||
|
if tool_calls is not None:
|
||||||
|
num_tool_calls = len(tool_calls) if isinstance(tool_calls, list) else 1
|
||||||
|
|
||||||
|
def _do(conn):
|
||||||
|
cursor = conn.execute(
|
||||||
"""INSERT INTO messages (session_id, role, content, tool_call_id,
|
"""INSERT INTO messages (session_id, role, content, tool_call_id,
|
||||||
tool_calls, tool_name, timestamp, token_count, finish_reason,
|
tool_calls, tool_name, timestamp, token_count, finish_reason,
|
||||||
reasoning, reasoning_details, codex_reasoning_items)
|
reasoning, reasoning_details, codex_reasoning_items)
|
||||||
@@ -782,7 +902,7 @@ class SessionDB:
|
|||||||
role,
|
role,
|
||||||
content,
|
content,
|
||||||
tool_call_id,
|
tool_call_id,
|
||||||
json.dumps(tool_calls) if tool_calls else None,
|
tool_calls_json,
|
||||||
tool_name,
|
tool_name,
|
||||||
time.time(),
|
time.time(),
|
||||||
token_count,
|
token_count,
|
||||||
@@ -795,25 +915,20 @@ class SessionDB:
|
|||||||
msg_id = cursor.lastrowid
|
msg_id = cursor.lastrowid
|
||||||
|
|
||||||
# Update counters
|
# Update counters
|
||||||
# Count actual tool calls from the tool_calls list (not from tool responses).
|
|
||||||
# A single assistant message can contain multiple parallel tool calls.
|
|
||||||
num_tool_calls = 0
|
|
||||||
if tool_calls is not None:
|
|
||||||
num_tool_calls = len(tool_calls) if isinstance(tool_calls, list) else 1
|
|
||||||
if num_tool_calls > 0:
|
if num_tool_calls > 0:
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"""UPDATE sessions SET message_count = message_count + 1,
|
"""UPDATE sessions SET message_count = message_count + 1,
|
||||||
tool_call_count = tool_call_count + ? WHERE id = ?""",
|
tool_call_count = tool_call_count + ? WHERE id = ?""",
|
||||||
(num_tool_calls, session_id),
|
(num_tool_calls, session_id),
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"UPDATE sessions SET message_count = message_count + 1 WHERE id = ?",
|
"UPDATE sessions SET message_count = message_count + 1 WHERE id = ?",
|
||||||
(session_id,),
|
(session_id,),
|
||||||
)
|
)
|
||||||
|
return msg_id
|
||||||
|
|
||||||
self._conn.commit()
|
return self._execute_write(_do)
|
||||||
return msg_id
|
|
||||||
|
|
||||||
def get_messages(self, session_id: str) -> List[Dict[str, Any]]:
|
def get_messages(self, session_id: str) -> List[Dict[str, Any]]:
|
||||||
"""Load all messages for a session, ordered by timestamp."""
|
"""Load all messages for a session, ordered by timestamp."""
|
||||||
@@ -1107,54 +1222,53 @@ class SessionDB:
|
|||||||
|
|
||||||
def clear_messages(self, session_id: str) -> None:
|
def clear_messages(self, session_id: str) -> None:
|
||||||
"""Delete all messages for a session and reset its counters."""
|
"""Delete all messages for a session and reset its counters."""
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"DELETE FROM messages WHERE session_id = ?", (session_id,)
|
"DELETE FROM messages WHERE session_id = ?", (session_id,)
|
||||||
)
|
)
|
||||||
self._conn.execute(
|
conn.execute(
|
||||||
"UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?",
|
"UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?",
|
||||||
(session_id,),
|
(session_id,),
|
||||||
)
|
)
|
||||||
self._conn.commit()
|
self._execute_write(_do)
|
||||||
|
|
||||||
def delete_session(self, session_id: str) -> bool:
|
def delete_session(self, session_id: str) -> bool:
|
||||||
"""Delete a session and all its messages. Returns True if found."""
|
"""Delete a session and all its messages. Returns True if found."""
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
cursor = self._conn.execute(
|
cursor = conn.execute(
|
||||||
"SELECT COUNT(*) FROM sessions WHERE id = ?", (session_id,)
|
"SELECT COUNT(*) FROM sessions WHERE id = ?", (session_id,)
|
||||||
)
|
)
|
||||||
if cursor.fetchone()[0] == 0:
|
if cursor.fetchone()[0] == 0:
|
||||||
return False
|
return False
|
||||||
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
|
conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
|
||||||
self._conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
|
conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
|
||||||
self._conn.commit()
|
|
||||||
return True
|
return True
|
||||||
|
return self._execute_write(_do)
|
||||||
|
|
||||||
def prune_sessions(self, older_than_days: int = 90, source: str = None) -> int:
|
def prune_sessions(self, older_than_days: int = 90, source: str = None) -> int:
|
||||||
"""
|
"""
|
||||||
Delete sessions older than N days. Returns count of deleted sessions.
|
Delete sessions older than N days. Returns count of deleted sessions.
|
||||||
Only prunes ended sessions (not active ones).
|
Only prunes ended sessions (not active ones).
|
||||||
"""
|
"""
|
||||||
import time as _time
|
cutoff = time.time() - (older_than_days * 86400)
|
||||||
cutoff = _time.time() - (older_than_days * 86400)
|
|
||||||
|
|
||||||
with self._lock:
|
def _do(conn):
|
||||||
if source:
|
if source:
|
||||||
cursor = self._conn.execute(
|
cursor = conn.execute(
|
||||||
"""SELECT id FROM sessions
|
"""SELECT id FROM sessions
|
||||||
WHERE started_at < ? AND ended_at IS NOT NULL AND source = ?""",
|
WHERE started_at < ? AND ended_at IS NOT NULL AND source = ?""",
|
||||||
(cutoff, source),
|
(cutoff, source),
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
cursor = self._conn.execute(
|
cursor = conn.execute(
|
||||||
"SELECT id FROM sessions WHERE started_at < ? AND ended_at IS NOT NULL",
|
"SELECT id FROM sessions WHERE started_at < ? AND ended_at IS NOT NULL",
|
||||||
(cutoff,),
|
(cutoff,),
|
||||||
)
|
)
|
||||||
session_ids = [row["id"] for row in cursor.fetchall()]
|
session_ids = [row["id"] for row in cursor.fetchall()]
|
||||||
|
|
||||||
for sid in session_ids:
|
for sid in session_ids:
|
||||||
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
|
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
|
||||||
self._conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
|
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
|
||||||
|
return len(session_ids)
|
||||||
|
|
||||||
self._conn.commit()
|
return self._execute_write(_do)
|
||||||
return len(session_ids)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user