Compare commits

...

2 Commits

Author SHA1 Message Date
Teknium
558971f44d fix: lazy-init SessionDB on adapter instance instead of per-request
Reuse a single SessionDB across requests by caching on self._session_db
with lazy initialization. Avoids creating a new SQLite connection per
request when X-Hermes-Session-Id is used. Updated tests to set
adapter._session_db directly instead of patching the constructor.
2026-04-01 11:29:20 -07:00
txchen
7a3ccea42e feat(api-server): support X-Hermes-Session-Id header for session continuity
Allow callers to pass X-Hermes-Session-Id in request headers to continue
an existing conversation. When provided, history is loaded from SessionDB
instead of the request body, and the session_id is echoed in the response
header. Without the header, existing behavior is preserved (new uuid per
request).

This enables web UI clients to maintain thread continuity without modifying
any session state themselves — the same mechanism the gateway uses for IM
platforms (Telegram, Discord, etc.).
2026-04-01 11:19:48 -07:00
2 changed files with 131 additions and 5 deletions

View File

@@ -2,7 +2,7 @@
OpenAI-compatible API server platform adapter. OpenAI-compatible API server platform adapter.
Exposes an HTTP server with endpoints: Exposes an HTTP server with endpoints:
- POST /v1/chat/completions — OpenAI Chat Completions format (stateless) - POST /v1/chat/completions — OpenAI Chat Completions format (stateless; opt-in session continuity via X-Hermes-Session-Id header)
- POST /v1/responses — OpenAI Responses API format (stateful via previous_response_id) - POST /v1/responses — OpenAI Responses API format (stateful via previous_response_id)
- GET /v1/responses/{response_id} — Retrieve a stored response - GET /v1/responses/{response_id} — Retrieve a stored response
- DELETE /v1/responses/{response_id} — Delete a stored response - DELETE /v1/responses/{response_id} — Delete a stored response
@@ -300,6 +300,7 @@ class APIServerAdapter(BasePlatformAdapter):
self._runner: Optional["web.AppRunner"] = None self._runner: Optional["web.AppRunner"] = None
self._site: Optional["web.TCPSite"] = None self._site: Optional["web.TCPSite"] = None
self._response_store = ResponseStore() self._response_store = ResponseStore()
self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity
@staticmethod @staticmethod
def _parse_cors_origins(value: Any) -> tuple[str, ...]: def _parse_cors_origins(value: Any) -> tuple[str, ...]:
@@ -496,7 +497,23 @@ class APIServerAdapter(BasePlatformAdapter):
status=400, status=400,
) )
# Allow caller to continue an existing session by passing X-Hermes-Session-Id.
# When provided, history is loaded from state.db instead of from the request body.
provided_session_id = request.headers.get("X-Hermes-Session-Id", "").strip()
if provided_session_id:
session_id = provided_session_id
try:
if self._session_db is None:
from hermes_state import SessionDB
self._session_db = SessionDB()
history = self._session_db.get_messages_as_conversation(session_id)
except Exception as e:
logger.warning("Failed to load session history for %s: %s", session_id, e)
history = []
else:
session_id = str(uuid.uuid4()) session_id = str(uuid.uuid4())
# history already set from request body above
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
model_name = body.get("model", "hermes-agent") model_name = body.get("model", "hermes-agent")
created = int(time.time()) created = int(time.time())
@@ -540,7 +557,7 @@ class APIServerAdapter(BasePlatformAdapter):
return await self._write_sse_chat_completion( return await self._write_sse_chat_completion(
request, completion_id, model_name, created, _stream_q, request, completion_id, model_name, created, _stream_q,
agent_task, agent_ref, agent_task, agent_ref, session_id=session_id,
) )
# Non-streaming: run the agent (with optional Idempotency-Key) # Non-streaming: run the agent (with optional Idempotency-Key)
@@ -599,11 +616,11 @@ class APIServerAdapter(BasePlatformAdapter):
}, },
} }
return web.json_response(response_data) return web.json_response(response_data, headers={"X-Hermes-Session-Id": session_id})
async def _write_sse_chat_completion( async def _write_sse_chat_completion(
self, request: "web.Request", completion_id: str, model: str, self, request: "web.Request", completion_id: str, model: str,
created: int, stream_q, agent_task, agent_ref=None, created: int, stream_q, agent_task, agent_ref=None, session_id: str = None,
) -> "web.StreamResponse": ) -> "web.StreamResponse":
"""Write real streaming SSE from agent's stream_delta_callback queue. """Write real streaming SSE from agent's stream_delta_callback queue.
@@ -620,6 +637,8 @@ class APIServerAdapter(BasePlatformAdapter):
cors = self._cors_headers_for_origin(origin) if origin else None cors = self._cors_headers_for_origin(origin) if origin else None
if cors: if cors:
sse_headers.update(cors) sse_headers.update(cors)
if session_id:
sse_headers["X-Hermes-Session-Id"] = session_id
response = web.StreamResponse(status=200, headers=sse_headers) response = web.StreamResponse(status=200, headers=sse_headers)
await response.prepare(request) await response.prepare(request)

View File

@@ -1576,3 +1576,110 @@ class TestConversationParameter:
assert resp.status == 200 assert resp.status == 200
# Conversation mapping should NOT be set since store=false # Conversation mapping should NOT be set since store=false
assert adapter._response_store.get_conversation("ephemeral-chat") is None assert adapter._response_store.get_conversation("ephemeral-chat") is None
# ---------------------------------------------------------------------------
# X-Hermes-Session-Id header (session continuity)
# ---------------------------------------------------------------------------
class TestSessionIdHeader:
@pytest.mark.asyncio
async def test_new_session_response_includes_session_id_header(self, adapter):
"""Without X-Hermes-Session-Id, a new session is created and returned in the header."""
mock_result = {"final_response": "Hello!", "messages": [], "api_calls": 1}
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0})
resp = await cli.post(
"/v1/chat/completions",
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "Hi"}]},
)
assert resp.status == 200
assert resp.headers.get("X-Hermes-Session-Id") is not None
@pytest.mark.asyncio
async def test_provided_session_id_is_used_and_echoed(self, adapter):
"""When X-Hermes-Session-Id is provided, it's passed to the agent and echoed in the response."""
mock_result = {"final_response": "Continuing!", "messages": [], "api_calls": 1}
mock_db = MagicMock()
mock_db.get_messages_as_conversation.return_value = [
{"role": "user", "content": "previous message"},
{"role": "assistant", "content": "previous reply"},
]
adapter._session_db = mock_db
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0})
resp = await cli.post(
"/v1/chat/completions",
headers={"X-Hermes-Session-Id": "my-session-123"},
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "Continue"}]},
)
assert resp.status == 200
assert resp.headers.get("X-Hermes-Session-Id") == "my-session-123"
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["session_id"] == "my-session-123"
@pytest.mark.asyncio
async def test_provided_session_id_loads_history_from_db(self, adapter):
"""When X-Hermes-Session-Id is provided, history comes from SessionDB not request body."""
mock_result = {"final_response": "OK", "messages": [], "api_calls": 1}
db_history = [
{"role": "user", "content": "stored message 1"},
{"role": "assistant", "content": "stored reply 1"},
]
mock_db = MagicMock()
mock_db.get_messages_as_conversation.return_value = db_history
adapter._session_db = mock_db
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0})
resp = await cli.post(
"/v1/chat/completions",
headers={"X-Hermes-Session-Id": "existing-session"},
# Request body has different history — should be ignored
json={
"model": "hermes-agent",
"messages": [
{"role": "user", "content": "old msg from client"},
{"role": "assistant", "content": "old reply from client"},
{"role": "user", "content": "new question"},
],
},
)
assert resp.status == 200
call_kwargs = mock_run.call_args.kwargs
# History must come from DB, not from the request body
assert call_kwargs["conversation_history"] == db_history
assert call_kwargs["user_message"] == "new question"
@pytest.mark.asyncio
async def test_db_failure_falls_back_to_empty_history(self, adapter):
"""If SessionDB raises, history falls back to empty and request still succeeds."""
mock_result = {"final_response": "OK", "messages": [], "api_calls": 1}
# Simulate DB failure: _session_db is None and SessionDB() constructor raises
adapter._session_db = None
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run, \
patch("hermes_state.SessionDB", side_effect=Exception("DB unavailable")):
mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0})
resp = await cli.post(
"/v1/chat/completions",
headers={"X-Hermes-Session-Id": "some-session"},
json={"model": "hermes-agent", "messages": [{"role": "user", "content": "Hi"}]},
)
assert resp.status == 200
call_kwargs = mock_run.call_args.kwargs
assert call_kwargs["conversation_history"] == []
assert call_kwargs["session_id"] == "some-session"