Compare commits

...

7 Commits

Author SHA1 Message Date
teknium1
550402116d docs: comprehensive documentation for API server, streaming, and Open WebUI
- website/docs/user-guide/features/api-server.md — full API server docs:
  endpoints (chat completions, responses, models), system prompt handling,
  auth, config, compatible frontends matrix, limitations
- website/docs/user-guide/features/streaming.md — streaming docs:
  per-platform support matrix, architecture, config reference,
  troubleshooting, interaction with tools/compression/interrupts
- website/docs/user-guide/messaging/open-webui.md — already existed,
  step-by-step Open WebUI integration guide
- website/docs/user-guide/messaging/index.md — updated to include
  API server in architecture diagram and description
2026-03-10 17:01:44 -07:00
teknium1
5a426e084a feat: add streaming LLM response support across all platforms
Token-by-token streaming of LLM responses, disabled by default.
Enable via streaming.enabled: true in config.yaml or
HERMES_STREAMING_ENABLED=true env var.

Core (run_agent.py):
- stream_callback parameter on AIAgent
- _run_streaming_chat_completion() for Chat Completions streaming
- _run_codex_stream() now emits tokens via callback
- _interruptible_api_call routes to streaming when callback is set
- Graceful fallback to non-streaming on any error

Gateway (gateway/run.py):
- Read streaming config (master switch + per-platform overrides)
- Queue-based callback bridging agent thread to async event loop
- stream_preview task: progressive message editing with cursor
- Skip normal send when streaming already delivered the response
- Thread-safe, respects platform rate limits (1.5s edit interval)

API Server (gateway/platforms/api_server.py):
- Real token-by-token SSE when stream=true (replaces pseudo-streaming)
- stream_callback wired through _create_agent and _run_agent
- Background agent task + queue for concurrent streaming

Config:
- streaming.enabled (master switch, default: false)
- Per-platform: streaming.telegram, streaming.discord, etc.
- HERMES_STREAMING_ENABLED env var override
2026-03-10 16:29:26 -07:00
teknium1
e00335fa5f docs: add Open WebUI integration guide
Step-by-step guide for connecting Open WebUI to hermes-agent via the
API server. Covers Docker setup, Admin UI config, Chat Completions
vs Responses API modes, troubleshooting, and Linux Docker networking.
2026-03-10 16:05:17 -07:00
teknium1
3a64df8873 feat: add pseudo-streaming SSE + conversation parameter
- stream=true now returns SSE (role chunk → content chunk → finish → [DONE])
  instead of 501. Not token-by-token but compatible with frontends like
  Open WebUI that require SSE format.
- Add conversation parameter for named session chaining (like /title).
  Mutually exclusive with previous_response_id.
2026-03-10 16:05:17 -07:00
teknium1
93a22a4245 feat: add conversation parameter + named session chaining
Like /title for sessions — clients can name conversations instead of
tracking response IDs manually:

  POST /v1/responses {input: 'hi', conversation: 'my-project'}
  POST /v1/responses {input: 'next step', conversation: 'my-project'}

Server automatically chains to the latest response in that conversation.
Mutually exclusive with previous_response_id (returns 400 if both set).
Not stored if store=false.

5 new tests (72 total).
2026-03-10 16:05:17 -07:00
teknium1
f46bd77ad2 feat: enhance Responses API — retrieval, deletion, tool calls, usage, CORS
- GET /v1/responses/{id} — retrieve stored responses
- DELETE /v1/responses/{id} — delete stored responses
- Tool call items in output (function_call + function_call_output)
- Real token counting from AIAgent (prompt/completion/total)
- Truncation parameter support (auto mode, max 100 messages)
- CORS middleware (Access-Control-Allow-Origin: *)
- Full response objects stored for retrieval
- 16 new tests (67 total)
2026-03-10 16:05:17 -07:00
teknium1
5e5d2bd79a feat: add OpenAI-compatible API server platform adapter (Phase 1)
Adds a new gateway platform adapter that exposes an HTTP server with
OpenAI-compatible endpoints, allowing any OpenAI-compatible frontend
(Open WebUI, LobeChat, etc.) to use hermes-agent as a backend.

Endpoints:
- POST /v1/chat/completions - OpenAI Chat Completions format (stateless)
- POST /v1/responses - OpenAI Responses API format (stateful via previous_response_id)
- GET /v1/models - lists hermes-agent as an available model
- GET /health - health check

Features:
- Bearer token auth via API_SERVER_KEY env var (unauthenticated when no key set)
- System messages/instructions become ephemeral system prompt (layered on top of core prompt)
- In-memory LRU response store for Responses API conversation chaining
- Agent runs in thread executor (run_conversation is synchronous)
- Streaming returns 501 (not yet implemented)

New files:
- gateway/platforms/api_server.py - APIServerAdapter class (~470 lines)
- tests/gateway/test_api_server.py - 51 tests covering all endpoints, auth, config

Modified files:
- gateway/config.py - Added Platform.API_SERVER enum, env var overrides, connected platforms
- gateway/run.py - Added _create_adapter() case, auth map entries, auth bypass
2026-03-10 16:05:17 -07:00
11 changed files with 3423 additions and 13 deletions

View File

@@ -219,6 +219,22 @@ compression:
# Options: "auto", "openrouter", "nous", "main"
# summary_provider: "auto"
# =============================================================================
# Streaming (live token-by-token response display)
# =============================================================================
# When enabled, LLM responses stream token-by-token instead of appearing
# all at once. Supported on Telegram, Discord, Slack (via message editing)
# and the API server (via SSE). Disabled by default.
#
# streaming:
# enabled: false # Master switch (default: off)
# # Per-platform overrides:
# # telegram: true
# # discord: true
# # api_server: true
# # edit_interval: 1.5 # Seconds between message edits (default: 1.5)
# # min_tokens: 20 # Tokens before first display (default: 20)
# =============================================================================
# Auxiliary Models (Advanced — Experimental)
# =============================================================================

View File

@@ -28,6 +28,7 @@ class Platform(Enum):
SLACK = "slack"
SIGNAL = "signal"
HOMEASSISTANT = "homeassistant"
API_SERVER = "api_server"
@dataclass
@@ -167,6 +168,9 @@ class GatewayConfig:
# Signal uses extra dict for config (http_url + account)
elif platform == Platform.SIGNAL and config.extra.get("http_url"):
connected.append(platform)
# API Server uses enabled flag only (no token needed)
elif platform == Platform.API_SERVER:
connected.append(platform)
return connected
def get_home_channel(self, platform: Platform) -> Optional[HomeChannel]:
@@ -420,6 +424,25 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
if hass_url:
config.platforms[Platform.HOMEASSISTANT].extra["url"] = hass_url
# API Server
api_server_enabled = os.getenv("API_SERVER_ENABLED", "").lower() in ("true", "1", "yes")
api_server_key = os.getenv("API_SERVER_KEY", "")
api_server_port = os.getenv("API_SERVER_PORT")
api_server_host = os.getenv("API_SERVER_HOST")
if api_server_enabled or api_server_key:
if Platform.API_SERVER not in config.platforms:
config.platforms[Platform.API_SERVER] = PlatformConfig()
config.platforms[Platform.API_SERVER].enabled = True
if api_server_key:
config.platforms[Platform.API_SERVER].extra["key"] = api_server_key
if api_server_port:
try:
config.platforms[Platform.API_SERVER].extra["port"] = int(api_server_port)
except ValueError:
pass
if api_server_host:
config.platforms[Platform.API_SERVER].extra["host"] = api_server_host
# Session settings
idle_minutes = os.getenv("SESSION_IDLE_MINUTES")
if idle_minutes:

View File

@@ -0,0 +1,847 @@
"""
OpenAI-compatible API server platform adapter.
Exposes an HTTP server with endpoints:
- POST /v1/chat/completions — OpenAI Chat Completions format (stateless)
- POST /v1/responses — OpenAI Responses API format (stateful via previous_response_id)
- GET /v1/responses/{response_id} — Retrieve a stored response
- DELETE /v1/responses/{response_id} — Delete a stored response
- GET /v1/models — lists hermes-agent as an available model
- GET /health — health check
Any OpenAI-compatible frontend (Open WebUI, LobeChat, etc.) can connect
to hermes-agent through this adapter.
Requires:
- aiohttp (already available in the gateway)
"""
import asyncio
import collections
import json
import logging
import os
import time
import uuid
from functools import wraps
from typing import Any, Dict, List, Optional
try:
import aiohttp
from aiohttp import web
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
aiohttp = None # type: ignore[assignment]
web = None # type: ignore[assignment]
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
)
logger = logging.getLogger(__name__)
# Default settings
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 8642
MAX_STORED_RESPONSES = 100
def check_api_server_requirements() -> bool:
"""Check if API server dependencies are available."""
return AIOHTTP_AVAILABLE
class ResponseStore:
"""
In-memory LRU store for Responses API state.
Each stored response includes the full internal conversation history
(with tool calls and results) so it can be reconstructed on subsequent
requests via previous_response_id.
"""
def __init__(self, max_size: int = MAX_STORED_RESPONSES):
self._store: collections.OrderedDict[str, Dict[str, Any]] = collections.OrderedDict()
self._max_size = max_size
def get(self, response_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve a stored response by ID (moves to end for LRU)."""
if response_id in self._store:
self._store.move_to_end(response_id)
return self._store[response_id]
return None
def put(self, response_id: str, data: Dict[str, Any]) -> None:
"""Store a response, evicting the oldest if at capacity."""
if response_id in self._store:
self._store.move_to_end(response_id)
self._store[response_id] = data
while len(self._store) > self._max_size:
self._store.popitem(last=False)
def delete(self, response_id: str) -> bool:
"""Remove a response from the store. Returns True if found and deleted."""
if response_id in self._store:
del self._store[response_id]
return True
return False
def __len__(self) -> int:
return len(self._store)
# ---------------------------------------------------------------------------
# CORS middleware
# ---------------------------------------------------------------------------
_CORS_HEADERS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Authorization, Content-Type",
}
if AIOHTTP_AVAILABLE:
@web.middleware
async def cors_middleware(request, handler):
"""Add CORS headers to every response; handle OPTIONS preflight."""
if request.method == "OPTIONS":
return web.Response(status=200, headers=_CORS_HEADERS)
response = await handler(request)
response.headers.update(_CORS_HEADERS)
return response
else:
cors_middleware = None # type: ignore[assignment]
class APIServerAdapter(BasePlatformAdapter):
"""
OpenAI-compatible HTTP API server adapter.
Runs an aiohttp web server that accepts OpenAI-format requests
and routes them through hermes-agent's AIAgent.
"""
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.API_SERVER)
extra = config.extra or {}
self._host: str = extra.get("host", os.getenv("API_SERVER_HOST", DEFAULT_HOST))
self._port: int = int(extra.get("port", os.getenv("API_SERVER_PORT", str(DEFAULT_PORT))))
self._api_key: str = extra.get("key", os.getenv("API_SERVER_KEY", ""))
self._app: Optional["web.Application"] = None
self._runner: Optional["web.AppRunner"] = None
self._site: Optional["web.TCPSite"] = None
self._response_store = ResponseStore()
# Conversation name → latest response_id mapping
self._conversations: Dict[str, str] = {}
# ------------------------------------------------------------------
# Auth helper
# ------------------------------------------------------------------
def _check_auth(self, request: "web.Request") -> Optional["web.Response"]:
"""
Validate Bearer token from Authorization header.
Returns None if auth is OK, or a 401 web.Response on failure.
If no API key is configured, all requests are allowed.
"""
if not self._api_key:
return None # No key configured — allow all (local-only use)
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:].strip()
if token == self._api_key:
return None # Auth OK
return web.json_response(
{"error": {"message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key"}},
status=401,
)
# ------------------------------------------------------------------
# Agent creation helper
# ------------------------------------------------------------------
def _create_agent(
self,
ephemeral_system_prompt: Optional[str] = None,
session_id: Optional[str] = None,
stream_callback=None,
) -> Any:
"""
Create an AIAgent instance using the gateway's runtime config.
Uses _resolve_runtime_agent_kwargs() to pick up model, api_key,
base_url, etc. from config.yaml / env vars.
"""
from run_agent import AIAgent
from gateway.run import _resolve_runtime_agent_kwargs
runtime_kwargs = _resolve_runtime_agent_kwargs()
# Read model from env/config (same as gateway run.py)
model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
try:
import yaml
from pathlib import Path
config_yaml_path = Path.home() / ".hermes" / "config.yaml"
if config_yaml_path.exists():
with open(config_yaml_path, encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, str):
model = model_cfg
elif isinstance(model_cfg, dict):
model = model_cfg.get("default", model)
except Exception:
pass
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90"))
agent = AIAgent(
model=model,
**runtime_kwargs,
max_iterations=max_iterations,
quiet_mode=True,
verbose_logging=False,
ephemeral_system_prompt=ephemeral_system_prompt or None,
session_id=session_id,
platform="api_server",
stream_callback=stream_callback,
)
return agent
# ------------------------------------------------------------------
# HTTP Handlers
# ------------------------------------------------------------------
async def _handle_health(self, request: "web.Request") -> "web.Response":
"""GET /health — simple health check."""
return web.json_response({"status": "ok", "platform": "hermes-agent"})
async def _handle_models(self, request: "web.Request") -> "web.Response":
"""GET /v1/models — return hermes-agent as an available model."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
return web.json_response({
"object": "list",
"data": [
{
"id": "hermes-agent",
"object": "model",
"created": int(time.time()),
"owned_by": "hermes",
"permission": [],
"root": "hermes-agent",
"parent": None,
}
],
})
async def _handle_chat_completions(self, request: "web.Request") -> "web.Response":
"""POST /v1/chat/completions — OpenAI Chat Completions format."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
# Parse request body
try:
body = await request.json()
except (json.JSONDecodeError, Exception):
return web.json_response(
{"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}},
status=400,
)
messages = body.get("messages")
if not messages or not isinstance(messages, list):
return web.json_response(
{"error": {"message": "Missing or invalid 'messages' field", "type": "invalid_request_error"}},
status=400,
)
stream = body.get("stream", False)
# Extract system message (becomes ephemeral system prompt layered ON TOP of core)
system_prompt = None
conversation_messages: List[Dict[str, str]] = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
if role == "system":
# Accumulate system messages
if system_prompt is None:
system_prompt = content
else:
system_prompt = system_prompt + "\n" + content
elif role in ("user", "assistant"):
conversation_messages.append({"role": role, "content": content})
# Extract the last user message as the primary input
user_message = ""
history = []
if conversation_messages:
user_message = conversation_messages[-1].get("content", "")
history = conversation_messages[:-1]
if not user_message:
return web.json_response(
{"error": {"message": "No user message found in messages", "type": "invalid_request_error"}},
status=400,
)
session_id = str(uuid.uuid4())
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
model_name = body.get("model", "hermes-agent")
created = int(time.time())
if stream:
import queue as _q
_stream_q = _q.Queue()
def _on_api_token(delta):
_stream_q.put(delta) # None = done
# Start agent in background
agent_task = asyncio.ensure_future(self._run_agent(
user_message=user_message,
conversation_history=history,
ephemeral_system_prompt=system_prompt,
session_id=session_id,
stream_callback=_on_api_token,
))
return await self._write_real_sse_chat_completion(
request, completion_id, model_name, created, _stream_q, agent_task
)
# Non-streaming: run the agent and return full response
try:
result, usage = await self._run_agent(
user_message=user_message,
conversation_history=history,
ephemeral_system_prompt=system_prompt,
session_id=session_id,
)
except Exception as e:
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
return web.json_response(
{"error": {"message": f"Internal server error: {e}", "type": "server_error"}},
status=500,
)
final_response = result.get("final_response", "")
if not final_response:
final_response = result.get("error", "(No response generated)")
response_data = {
"id": completion_id,
"object": "chat.completion",
"created": created,
"model": model_name,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": final_response,
},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
return web.json_response(response_data)
async def _write_real_sse_chat_completion(
self, request: "web.Request", completion_id: str, model: str,
created: int, stream_q, agent_task,
) -> "web.StreamResponse":
"""Write real streaming SSE from agent's stream_callback queue."""
import queue as _q
response = web.StreamResponse(
status=200,
headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"},
)
await response.prepare(request)
# Role chunk
role_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
}
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
# Stream content chunks as they arrive from the agent
loop = asyncio.get_event_loop()
while True:
try:
delta = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5))
except _q.Empty:
if agent_task.done():
break
continue
if delta is None: # End of stream
break
content_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"content": delta}, "finish_reason": None}],
}
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
# Get usage from completed agent
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
try:
result, agent_usage = await agent_task
usage = agent_usage or usage
except Exception:
pass
# Finish chunk
finish_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
"usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
await response.write(f"data: {json.dumps(finish_chunk)}\n\n".encode())
await response.write(b"data: [DONE]\n\n")
return response
async def _write_sse_chat_completion(
self, request: "web.Request", completion_id: str, model: str,
created: int, content: str, usage: Dict[str, int],
) -> "web.StreamResponse":
"""Write a chat completion as SSE chunks (pseudo-streaming).
Returns the full response as three SSE events (role, content, finish)
followed by [DONE]. Not true token-by-token streaming, but compatible
with clients like Open WebUI that require SSE format.
"""
response = web.StreamResponse(
status=200,
headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"},
)
await response.prepare(request)
# Role chunk
role_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
}
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
# Content chunk (full response in one chunk for now)
content_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"content": content}, "finish_reason": None}],
}
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
# Finish chunk
finish_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
"usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
await response.write(f"data: {json.dumps(finish_chunk)}\n\n".encode())
await response.write(b"data: [DONE]\n\n")
return response
async def _handle_responses(self, request: "web.Request") -> "web.Response":
"""POST /v1/responses — OpenAI Responses API format."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
# Parse request body
try:
body = await request.json()
except (json.JSONDecodeError, Exception):
return web.json_response(
{"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}},
status=400,
)
raw_input = body.get("input")
if raw_input is None:
return web.json_response(
{"error": {"message": "Missing 'input' field", "type": "invalid_request_error"}},
status=400,
)
instructions = body.get("instructions")
previous_response_id = body.get("previous_response_id")
conversation = body.get("conversation")
store = body.get("store", True)
# conversation and previous_response_id are mutually exclusive
if conversation and previous_response_id:
return web.json_response(
{"error": {"message": "Cannot use both 'conversation' and 'previous_response_id'", "type": "invalid_request_error"}},
status=400,
)
# Resolve conversation name to latest response_id
if conversation:
previous_response_id = self._conversations.get(conversation)
# No error if conversation doesn't exist yet — it's a new conversation
# Normalize input to message list
input_messages: List[Dict[str, str]] = []
if isinstance(raw_input, str):
input_messages = [{"role": "user", "content": raw_input}]
elif isinstance(raw_input, list):
for item in raw_input:
if isinstance(item, str):
input_messages.append({"role": "user", "content": item})
elif isinstance(item, dict):
role = item.get("role", "user")
content = item.get("content", "")
# Handle content that may be a list of content parts
if isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict) and part.get("type") == "input_text":
text_parts.append(part.get("text", ""))
elif isinstance(part, dict) and part.get("type") == "output_text":
text_parts.append(part.get("text", ""))
elif isinstance(part, str):
text_parts.append(part)
content = "\n".join(text_parts)
input_messages.append({"role": role, "content": content})
else:
return web.json_response(
{"error": {"message": "'input' must be a string or array", "type": "invalid_request_error"}},
status=400,
)
# Reconstruct conversation history from previous_response_id
conversation_history: List[Dict[str, str]] = []
if previous_response_id:
stored = self._response_store.get(previous_response_id)
if stored is None:
return web.json_response(
{"error": {"message": f"Previous response not found: {previous_response_id}", "type": "invalid_request_error"}},
status=404,
)
conversation_history = list(stored.get("conversation_history", []))
# If no instructions provided, carry forward from previous
if instructions is None:
instructions = stored.get("instructions")
# Append new input messages to history (all but the last become history)
for msg in input_messages[:-1]:
conversation_history.append(msg)
# Last input message is the user_message
user_message = input_messages[-1].get("content", "") if input_messages else ""
if not user_message:
return web.json_response(
{"error": {"message": "No user message found in input", "type": "invalid_request_error"}},
status=400,
)
# Truncation support
if body.get("truncation") == "auto" and len(conversation_history) > 100:
conversation_history = conversation_history[-100:]
# Run the agent
session_id = str(uuid.uuid4())
try:
result, usage = await self._run_agent(
user_message=user_message,
conversation_history=conversation_history,
ephemeral_system_prompt=instructions,
session_id=session_id,
)
except Exception as e:
logger.error("Error running agent for responses: %s", e, exc_info=True)
return web.json_response(
{"error": {"message": f"Internal server error: {e}", "type": "server_error"}},
status=500,
)
final_response = result.get("final_response", "")
if not final_response:
final_response = result.get("error", "(No response generated)")
response_id = f"resp_{uuid.uuid4().hex[:28]}"
created_at = int(time.time())
# Build the full conversation history for storage
# (includes tool calls from the agent run)
full_history = list(conversation_history)
full_history.append({"role": "user", "content": user_message})
# Add agent's internal messages if available
agent_messages = result.get("messages", [])
if agent_messages:
full_history.extend(agent_messages)
else:
full_history.append({"role": "assistant", "content": final_response})
# Build output items (includes tool calls + final message)
output_items = self._extract_output_items(result)
response_data = {
"id": response_id,
"object": "response",
"status": "completed",
"created_at": created_at,
"model": body.get("model", "hermes-agent"),
"output": output_items,
"usage": {
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
# Store the complete response object for future chaining / GET retrieval
if store:
self._response_store.put(response_id, {
"response": response_data,
"conversation_history": full_history,
"instructions": instructions,
})
# Update conversation mapping so the next request with the same
# conversation name automatically chains to this response
if conversation:
self._conversations[conversation] = response_id
return web.json_response(response_data)
# ------------------------------------------------------------------
# Agent execution
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# GET / DELETE response endpoints
# ------------------------------------------------------------------
async def _handle_get_response(self, request: "web.Request") -> "web.Response":
"""GET /v1/responses/{response_id} — retrieve a stored response."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
response_id = request.match_info["response_id"]
stored = self._response_store.get(response_id)
if stored is None:
return web.json_response(
{"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}},
status=404,
)
return web.json_response(stored["response"])
async def _handle_delete_response(self, request: "web.Request") -> "web.Response":
"""DELETE /v1/responses/{response_id} — delete a stored response."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
response_id = request.match_info["response_id"]
deleted = self._response_store.delete(response_id)
if not deleted:
return web.json_response(
{"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}},
status=404,
)
return web.json_response({
"id": response_id,
"object": "response",
"deleted": True,
})
# ------------------------------------------------------------------
# Output extraction helper
# ------------------------------------------------------------------
@staticmethod
def _extract_output_items(result: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Build the full output item array from the agent's messages.
Walks *result["messages"]* and emits:
- ``function_call`` items for each tool_call on assistant messages
- ``function_call_output`` items for each tool-role message
- a final ``message`` item with the assistant's text reply
"""
items: List[Dict[str, Any]] = []
messages = result.get("messages", [])
for msg in messages:
role = msg.get("role")
if role == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
func = tc.get("function", {})
items.append({
"type": "function_call",
"name": func.get("name", ""),
"arguments": func.get("arguments", ""),
"call_id": tc.get("id", ""),
})
elif role == "tool":
items.append({
"type": "function_call_output",
"call_id": msg.get("tool_call_id", ""),
"output": msg.get("content", ""),
})
# Final assistant message
final = result.get("final_response", "")
if not final:
final = result.get("error", "(No response generated)")
items.append({
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": final,
}
],
})
return items
# ------------------------------------------------------------------
# Agent execution
# ------------------------------------------------------------------
async def _run_agent(
self,
user_message: str,
conversation_history: List[Dict[str, str]],
ephemeral_system_prompt: Optional[str] = None,
session_id: Optional[str] = None,
stream_callback=None,
) -> tuple:
"""
Create an agent and run a conversation in a thread executor.
Returns ``(result_dict, usage_dict)`` where *usage_dict* contains
``input_tokens``, ``output_tokens`` and ``total_tokens``.
"""
loop = asyncio.get_event_loop()
def _run():
agent = self._create_agent(
ephemeral_system_prompt=ephemeral_system_prompt,
session_id=session_id,
stream_callback=stream_callback,
)
result = agent.run_conversation(
user_message=user_message,
conversation_history=conversation_history,
)
usage = {
"input_tokens": getattr(agent, "session_prompt_tokens", 0) or 0,
"output_tokens": getattr(agent, "session_completion_tokens", 0) or 0,
"total_tokens": getattr(agent, "session_total_tokens", 0) or 0,
}
return result, usage
return await loop.run_in_executor(None, _run)
# ------------------------------------------------------------------
# BasePlatformAdapter interface
# ------------------------------------------------------------------
async def connect(self) -> bool:
"""Start the aiohttp web server."""
if not AIOHTTP_AVAILABLE:
logger.warning("[%s] aiohttp not installed", self.name)
return False
try:
self._app = web.Application(middlewares=[cors_middleware])
self._app.router.add_get("/health", self._handle_health)
self._app.router.add_get("/v1/models", self._handle_models)
self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions)
self._app.router.add_post("/v1/responses", self._handle_responses)
self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response)
self._app.router.add_delete("/v1/responses/{response_id}", self._handle_delete_response)
self._runner = web.AppRunner(self._app)
await self._runner.setup()
self._site = web.TCPSite(self._runner, self._host, self._port)
await self._site.start()
self._running = True
logger.info(
"[%s] API server listening on http://%s:%d",
self.name, self._host, self._port,
)
return True
except Exception as e:
logger.error("[%s] Failed to start API server: %s", self.name, e)
return False
async def disconnect(self) -> None:
"""Stop the aiohttp web server."""
self._running = False
if self._site:
await self._site.stop()
self._site = None
if self._runner:
await self._runner.cleanup()
self._runner = None
self._app = None
logger.info("[%s] API server stopped", self.name)
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""
Not used — HTTP request/response cycle handles delivery directly.
"""
return SendResult(success=False, error="API server uses HTTP request/response, not send()")
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Return basic info about the API server."""
return {
"name": "API Server",
"type": "api",
"host": self._host,
"port": self._port,
}

View File

@@ -19,6 +19,7 @@ import os
import re
import sys
import signal
import time
import threading
from logging.handlers import RotatingFileHandler
from pathlib import Path
@@ -672,6 +673,13 @@ class GatewayRunner:
return None
return HomeAssistantAdapter(config)
elif platform == Platform.API_SERVER:
from gateway.platforms.api_server import APIServerAdapter, check_api_server_requirements
if not check_api_server_requirements():
logger.warning("API Server: aiohttp not installed")
return None
return APIServerAdapter(config)
return None
def _is_user_authorized(self, source: SessionSource) -> bool:
@@ -691,6 +699,11 @@ class GatewayRunner:
if source.platform == Platform.HOMEASSISTANT:
return True
# API Server handles auth at the HTTP layer (Bearer token),
# so requests that reach the gateway runner are already authorized.
if source.platform == Platform.API_SERVER:
return True
user_id = source.user_id
if not user_id:
return False
@@ -701,6 +714,7 @@ class GatewayRunner:
Platform.WHATSAPP: "WHATSAPP_ALLOWED_USERS",
Platform.SLACK: "SLACK_ALLOWED_USERS",
Platform.SIGNAL: "SIGNAL_ALLOWED_USERS",
Platform.API_SERVER: "API_SERVER_ALLOWED_KEYS",
}
platform_allow_all_map = {
Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS",
@@ -708,6 +722,7 @@ class GatewayRunner:
Platform.WHATSAPP: "WHATSAPP_ALLOW_ALL_USERS",
Platform.SLACK: "SLACK_ALLOW_ALL_USERS",
Platform.SIGNAL: "SIGNAL_ALLOW_ALL_USERS",
Platform.API_SERVER: "API_SERVER_ALLOW_ALL",
}
# Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true)
@@ -1340,6 +1355,23 @@ class GatewayRunner:
# Update session
self.session_store.update_session(session_entry.session_key)
# If streaming already delivered the response via progressive edits,
# do a final edit with the post-processed text and suppress the
# normal send to avoid duplicating the message.
_streamed_id = agent_result.get("_streamed_msg_id")
if _streamed_id and response:
adapter = self.adapters.get(source.platform)
if adapter:
try:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=_streamed_id,
content=response,
)
except Exception:
pass
return "" # Suppress normal send in base.py
return response
@@ -2739,7 +2771,35 @@ class GatewayRunner:
agent_holder = [None] # Mutable container for the agent instance
result_holder = [None] # Mutable container for the result
tools_holder = [None] # Mutable container for the tool definitions
# ── Streaming setup ─────────────────────────────────────────────
_stream_q = None
_stream_done = None
_stream_msg_id = [None]
_streaming_enabled = False
try:
import yaml as _s_yaml
_s_cfg_path = _hermes_home / "config.yaml"
if _s_cfg_path.exists():
with open(_s_cfg_path, encoding="utf-8") as _s_f:
_s_data = _s_yaml.safe_load(_s_f) or {}
_s_cfg = _s_data.get("streaming", {})
if isinstance(_s_cfg, dict):
_platform_key = source.platform.value if source.platform else ""
if _platform_key and _s_cfg.get(_platform_key) is not None:
_streaming_enabled = str(_s_cfg[_platform_key]).lower() in ("true", "1", "yes")
else:
_streaming_enabled = str(_s_cfg.get("enabled", False)).lower() in ("true", "1", "yes")
except Exception:
pass
if os.getenv("HERMES_STREAMING_ENABLED", "").lower() in ("true", "1", "yes"):
_streaming_enabled = True
if _streaming_enabled:
_stream_q = queue.Queue()
_stream_done = threading.Event()
# Bridge sync step_callback → async hooks.emit for agent:step events
_loop_for_step = asyncio.get_event_loop()
_hooks_ref = self.hooks
@@ -2812,6 +2872,19 @@ class GatewayRunner:
}
pr = self._provider_routing
# Streaming: build callback that feeds the async queue
_on_stream_token = None
if _stream_q is not None:
_sq = _stream_q # capture for closure
_sd = _stream_done
def _on_stream_token(delta):
if delta is None:
if _sd:
_sd.set()
else:
_sq.put(delta)
agent = AIAgent(
model=model,
**runtime_kwargs,
@@ -2829,6 +2902,7 @@ class GatewayRunner:
provider_require_parameters=pr.get("require_parameters", False),
provider_data_collection=pr.get("data_collection"),
session_id=session_id,
stream_callback=_on_stream_token,
tool_progress_callback=progress_callback if tool_progress_enabled else None,
step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None,
platform=platform_key,
@@ -2947,19 +3021,93 @@ class GatewayRunner:
unique_tags.insert(0, "[[audio_as_voice]]")
final_response = final_response + "\n" + "\n".join(unique_tags)
return {
_result_dict = {
"final_response": final_response,
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
"api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0,
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
}
if _stream_msg_id[0]:
_result_dict["_streamed_msg_id"] = _stream_msg_id[0]
return _result_dict
# Start progress message sender if enabled
progress_task = None
if tool_progress_enabled:
progress_task = asyncio.create_task(send_progress_messages())
# ── Stream preview: progressively edit a message with streaming tokens ──
async def stream_preview():
if not _stream_q or not _stream_done:
return
adapter = self.adapters.get(source.platform)
if not adapter:
return
accumulated = []
token_count = 0
last_edit = 0.0
MIN_TOKENS = 20
EDIT_INTERVAL = 1.5
_metadata = {"thread_id": source.thread_id} if source.thread_id else None
try:
while not _stream_done.is_set():
try:
chunk = _stream_q.get(timeout=0.1)
accumulated.append(chunk)
token_count += 1
except Exception:
continue
now = time.monotonic()
if token_count >= MIN_TOKENS and (now - last_edit) >= EDIT_INTERVAL:
preview = "".join(accumulated) + ""
if _stream_msg_id[0] is None:
r = await adapter.send(
chat_id=source.chat_id, content=preview,
metadata=_metadata,
)
if r.success and r.message_id:
_stream_msg_id[0] = r.message_id
else:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=_stream_msg_id[0],
content=preview,
)
last_edit = now
# Drain remaining
while not _stream_q.empty():
try:
accumulated.append(_stream_q.get_nowait())
except Exception:
break
# Final edit: remove cursor
if _stream_msg_id[0] and accumulated:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=_stream_msg_id[0],
content="".join(accumulated),
)
except asyncio.CancelledError:
if _stream_msg_id[0] and accumulated:
try:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=_stream_msg_id[0],
content="".join(accumulated),
)
except Exception:
pass
except Exception as e:
logger.debug("stream_preview error: %s", e)
stream_task = asyncio.create_task(stream_preview()) if _stream_q else None
# Track this agent as running for this session (for interrupt support)
# We do this in a callback after the agent is created
async def track_agent():
@@ -3034,9 +3182,11 @@ class GatewayRunner:
session_key=session_key
)
finally:
# Stop progress sender and interrupt monitor
# Stop progress sender, stream preview, and interrupt monitor
if progress_task:
progress_task.cancel()
if stream_task:
stream_task.cancel()
interrupt_monitor.cancel()
# Clean up tracking
@@ -3045,7 +3195,7 @@ class GatewayRunner:
del self._running_agents[session_key]
# Wait for cancelled tasks
for task in [progress_task, interrupt_monitor, tracking_task]:
for task in [progress_task, stream_task, interrupt_monitor, tracking_task]:
if task:
try:
await task

View File

@@ -175,6 +175,7 @@ class AIAgent:
thinking_callback: callable = None,
clarify_callback: callable = None,
step_callback: callable = None,
stream_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
@@ -228,6 +229,9 @@ class AIAgent:
polluting trajectories with user-specific persona or project instructions.
honcho_session_key (str): Session key for Honcho integration (e.g., "telegram:123456" or CLI session_id).
When provided and Honcho is enabled in config, enables persistent cross-session user modeling.
stream_callback (callable): Optional callback(text_delta: str) invoked for each
text token during streaming LLM generation. Pass None (end signal) when done.
When set, the agent uses stream=True for API calls. Disabled by default.
"""
self.model = model
self.max_iterations = max_iterations
@@ -262,6 +266,7 @@ class AIAgent:
self.thinking_callback = thinking_callback
self.clarify_callback = clarify_callback
self.step_callback = step_callback
self.stream_callback = stream_callback
self._last_reported_tool = None # Track for "new tool" mode
# Interrupt mechanism for breaking out of tool loops
@@ -1994,8 +1999,20 @@ class AIAgent:
for attempt in range(max_stream_retries + 1):
try:
with self.client.responses.stream(**api_kwargs) as stream:
for _ in stream:
pass
for event in stream:
if self.stream_callback and hasattr(event, 'type'):
if getattr(event, 'type', '') == 'response.output_text.delta':
delta_text = getattr(event, 'delta', '')
if delta_text:
try:
self.stream_callback(delta_text)
except Exception:
pass
if self.stream_callback:
try:
self.stream_callback(None)
except Exception:
pass
return stream.get_final_response()
except RuntimeError as exc:
err_text = str(exc)
@@ -2133,6 +2150,87 @@ class AIAgent:
return True
def _run_streaming_chat_completion(self, api_kwargs: dict):
"""Stream a chat completion, emitting text tokens via stream_callback.
Returns a SimpleNamespace response object compatible with the non-streaming
code path. Falls back to non-streaming on any error.
"""
stream_kwargs = dict(api_kwargs)
stream_kwargs["stream"] = True
# Request usage in the final chunk
stream_kwargs["stream_options"] = {"include_usage": True}
accumulated_content = []
accumulated_tool_calls = {}
final_usage = None
try:
stream = self.client.chat.completions.create(**stream_kwargs)
for chunk in stream:
if not chunk.choices:
if hasattr(chunk, 'usage') and chunk.usage:
final_usage = chunk.usage
continue
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
accumulated_content.append(delta.content)
if self.stream_callback:
try:
self.stream_callback(delta.content)
except Exception:
pass
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tc_delta in delta.tool_calls:
idx = tc_delta.index
if idx not in accumulated_tool_calls:
accumulated_tool_calls[idx] = {"id": tc_delta.id or "", "name": "", "arguments": ""}
if hasattr(tc_delta, 'function') and tc_delta.function:
if getattr(tc_delta.function, 'name', None):
accumulated_tool_calls[idx]["name"] = tc_delta.function.name
if getattr(tc_delta.function, 'arguments', None):
accumulated_tool_calls[idx]["arguments"] += tc_delta.function.arguments
if self.stream_callback:
try:
self.stream_callback(None) # End signal
except Exception:
pass
tool_calls = []
for idx in sorted(accumulated_tool_calls):
tc = accumulated_tool_calls[idx]
if tc["name"]:
tool_calls.append(SimpleNamespace(
id=tc["id"], type="function",
function=SimpleNamespace(name=tc["name"], arguments=tc["arguments"]),
))
return SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(
content="".join(accumulated_content) or "",
tool_calls=tool_calls or None,
role="assistant",
),
finish_reason="tool_calls" if tool_calls else "stop",
)],
usage=final_usage,
model=self.model,
)
except Exception as e:
if self.stream_callback:
try:
self.stream_callback(None)
except Exception:
pass
logger.debug("Streaming chat completion failed, falling back: %s", e)
return self.client.chat.completions.create(**api_kwargs)
def _interruptible_api_call(self, api_kwargs: dict):
"""
Run the API call in a background thread so the main conversation loop
@@ -2148,6 +2246,8 @@ class AIAgent:
try:
if self.api_mode == "codex_responses":
result["response"] = self._run_codex_stream(api_kwargs)
elif self.stream_callback is not None:
result["response"] = self._run_streaming_chat_completion(api_kwargs)
else:
result["response"] = self.client.chat.completions.create(**api_kwargs)
except Exception as e:

File diff suppressed because it is too large Load Diff

335
tests/test_streaming.py Normal file
View File

@@ -0,0 +1,335 @@
"""Unit tests for streaming support.
Tests cover:
- _run_streaming_chat_completion: text tokens, tool calls, fallback on error,
no callback, end signal
- _interruptible_api_call routing to streaming when stream_callback is set
- Streaming config reading from config.yaml
"""
import json
import threading
from types import SimpleNamespace
from unittest.mock import MagicMock, patch, PropertyMock
import pytest
from run_agent import AIAgent
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_tool_defs(*names: str) -> list:
"""Build minimal tool definition list accepted by AIAgent.__init__."""
return [
{
"type": "function",
"function": {
"name": n,
"description": f"{n} tool",
"parameters": {"type": "object", "properties": {}},
},
}
for n in names
]
@pytest.fixture()
def agent():
"""Minimal AIAgent with mocked client, no stream_callback."""
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
a.client = MagicMock()
return a
@pytest.fixture()
def streaming_agent():
"""Agent with a stream_callback set."""
collected = []
def _cb(delta):
collected.append(delta)
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
stream_callback=_cb,
)
a.client = MagicMock()
a._collected_tokens = collected
return a
# ---------------------------------------------------------------------------
# Helpers — build fake streaming chunks
# ---------------------------------------------------------------------------
def _make_text_chunks(*texts):
"""Return a list of SimpleNamespace chunks containing text deltas."""
chunks = []
for t in texts:
chunks.append(SimpleNamespace(
choices=[SimpleNamespace(
delta=SimpleNamespace(content=t, tool_calls=None),
finish_reason=None,
)],
usage=None,
))
# Final chunk with usage info
chunks.append(SimpleNamespace(
choices=[],
usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5, total_tokens=15),
))
return chunks
def _make_tool_call_chunks():
"""Return chunks that simulate a tool call response."""
chunks = [
# First chunk: tool call id + name
SimpleNamespace(
choices=[SimpleNamespace(
delta=SimpleNamespace(
content=None,
tool_calls=[SimpleNamespace(
index=0,
id="call_123",
function=SimpleNamespace(name="web_search", arguments=""),
)],
),
finish_reason=None,
)],
usage=None,
),
# Second chunk: tool call arguments
SimpleNamespace(
choices=[SimpleNamespace(
delta=SimpleNamespace(
content=None,
tool_calls=[SimpleNamespace(
index=0,
id=None,
function=SimpleNamespace(name=None, arguments='{"query": "test"}'),
)],
),
finish_reason=None,
)],
usage=None,
),
# Final usage chunk
SimpleNamespace(choices=[], usage=SimpleNamespace(
prompt_tokens=20, completion_tokens=10, total_tokens=30,
)),
]
return chunks
# ---------------------------------------------------------------------------
# Tests: _run_streaming_chat_completion
# ---------------------------------------------------------------------------
class TestRunStreamingChatCompletion:
"""Tests for AIAgent._run_streaming_chat_completion."""
def test_text_tokens_streamed_via_callback(self, streaming_agent):
"""Text deltas are forwarded to stream_callback and accumulated."""
chunks = _make_text_chunks("Hello", " ", "world")
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
assert result.choices[0].message.content == "Hello world"
# Callback received each token + None end signal
assert streaming_agent._collected_tokens == ["Hello", " ", "world", None]
def test_tool_calls_accumulated(self, streaming_agent):
"""Tool call deltas are aggregated into a proper tool_calls list."""
chunks = _make_tool_call_chunks()
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
assert result.choices[0].message.tool_calls is not None
tc = result.choices[0].message.tool_calls[0]
assert tc.function.name == "web_search"
assert '"query"' in tc.function.arguments
def test_fallback_on_streaming_error(self, streaming_agent):
"""Falls back to non-streaming on error."""
# First call (streaming) raises; second call (fallback) succeeds
fallback_response = SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(content="fallback", tool_calls=None, role="assistant"),
finish_reason="stop",
)],
usage=SimpleNamespace(prompt_tokens=5, completion_tokens=3, total_tokens=8),
model="test",
)
call_count = [0]
def _side_effect(**kwargs):
call_count[0] += 1
if kwargs.get("stream"):
raise ConnectionError("stream broke")
return fallback_response
streaming_agent.client.chat.completions.create.side_effect = _side_effect
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
assert result.choices[0].message.content == "fallback"
assert call_count[0] == 2 # streaming attempt + fallback
# Callback should still get None (end signal) even on error
assert None in streaming_agent._collected_tokens
def test_no_callback_still_works(self, agent):
"""Streaming works even without a callback (just accumulates)."""
chunks = _make_text_chunks("ok")
agent.client.chat.completions.create.return_value = iter(chunks)
result = agent._run_streaming_chat_completion({"model": "test"})
assert result.choices[0].message.content == "ok"
def test_end_signal_sent(self, streaming_agent):
"""stream_callback(None) is sent after all tokens."""
chunks = _make_text_chunks("done")
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
streaming_agent._run_streaming_chat_completion({"model": "test"})
assert streaming_agent._collected_tokens[-1] is None
def test_usage_captured_from_final_chunk(self, streaming_agent):
"""Usage stats from the final usage-only chunk are returned."""
chunks = _make_text_chunks("hi")
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
assert result.usage is not None
assert result.usage.prompt_tokens == 10
assert result.usage.completion_tokens == 5
# ---------------------------------------------------------------------------
# Tests: _interruptible_api_call routing
# ---------------------------------------------------------------------------
class TestInterruptibleApiCallRouting:
"""Tests that _interruptible_api_call routes to streaming when callback is set."""
def test_routes_to_streaming_with_callback(self, streaming_agent):
"""When stream_callback is set, _interruptible_api_call uses streaming."""
chunks = _make_text_chunks("streamed")
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
# Mock _interrupt_requested to False
streaming_agent._interrupt_requested = False
result = streaming_agent._interruptible_api_call({"model": "test"})
assert result.choices[0].message.content == "streamed"
# Verify the callback got tokens
assert "streamed" in streaming_agent._collected_tokens
def test_routes_to_normal_without_callback(self, agent):
"""When no stream_callback, _interruptible_api_call uses normal completion."""
normal_response = SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(content="normal", tool_calls=None, role="assistant"),
finish_reason="stop",
)],
usage=SimpleNamespace(prompt_tokens=5, completion_tokens=3, total_tokens=8),
model="test",
)
agent.client.chat.completions.create.return_value = normal_response
agent._interrupt_requested = False
result = agent._interruptible_api_call({"model": "test"})
assert result.choices[0].message.content == "normal"
# ---------------------------------------------------------------------------
# Tests: Streaming config
# ---------------------------------------------------------------------------
class TestStreamingConfig:
"""Tests for reading streaming configuration."""
def test_streaming_disabled_by_default(self):
"""Without any config, streaming is disabled."""
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert a.stream_callback is None
def test_stream_callback_stored_on_agent(self):
"""stream_callback passed to constructor is stored on the agent."""
cb = lambda delta: None
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
stream_callback=cb,
)
assert a.stream_callback is cb
def test_gateway_streaming_config_structure(self):
"""Verify the expected streaming config structure from gateway/run.py."""
# This tests that _read_streaming_config (if it exists) returns
# the right structure. We mock the config file content.
try:
from gateway.run import _read_streaming_config
except ImportError:
pytest.skip("gateway.run._read_streaming_config not available")
mock_cfg = {
"streaming": {
"enabled": True,
"telegram": True,
"discord": False,
"edit_interval": 2.0,
}
}
with patch("builtins.open", MagicMock()):
with patch("yaml.safe_load", return_value=mock_cfg):
try:
result = _read_streaming_config()
assert result.get("enabled") is True
except Exception:
# Function might not exist yet or have different signature
pytest.skip("_read_streaming_config has different interface")

View File

@@ -0,0 +1,217 @@
---
sidebar_position: 14
title: "API Server"
description: "Expose hermes-agent as an OpenAI-compatible API for any frontend"
---
# API Server
The API server exposes hermes-agent as an OpenAI-compatible HTTP endpoint. Any frontend that speaks the OpenAI format — Open WebUI, LobeChat, LibreChat, NextChat, ChatBox, and hundreds more — can connect to hermes-agent and use it as a backend.
Your agent handles requests with its full toolset (terminal, file operations, web search, memory, skills) and returns the final response. Tool calls execute invisibly server-side.
## Quick Start
### 1. Enable the API server
Add to `~/.hermes/.env`:
```bash
API_SERVER_ENABLED=true
```
### 2. Start the gateway
```bash
hermes gateway
```
You'll see:
```
[API Server] API server listening on http://127.0.0.1:8642
```
### 3. Connect a frontend
Point any OpenAI-compatible client at `http://localhost:8642/v1`:
```bash
# Test with curl
curl http://localhost:8642/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model": "hermes-agent", "messages": [{"role": "user", "content": "Hello!"}]}'
```
Or connect Open WebUI, LobeChat, or any other frontend — see the [Open WebUI integration guide](/docs/user-guide/messaging/open-webui) for step-by-step instructions.
## Endpoints
### POST /v1/chat/completions
Standard OpenAI Chat Completions format. Stateless — the full conversation is included in each request via the `messages` array.
**Request:**
```json
{
"model": "hermes-agent",
"messages": [
{"role": "system", "content": "You are a Python expert."},
{"role": "user", "content": "Write a fibonacci function"}
],
"stream": false
}
```
**Response:**
```json
{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1710000000,
"model": "hermes-agent",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "Here's a fibonacci function..."},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 50, "completion_tokens": 200, "total_tokens": 250}
}
```
**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. When streaming is enabled in config, tokens are emitted live as the LLM generates them. When disabled, the full response is sent as a single SSE chunk.
### POST /v1/responses
OpenAI Responses API format. Supports server-side conversation state via `previous_response_id` — the server stores full conversation history (including tool calls and results) so multi-turn context is preserved without the client managing it.
**Request:**
```json
{
"model": "hermes-agent",
"input": "What files are in my project?",
"instructions": "You are a helpful coding assistant.",
"store": true
}
```
**Response:**
```json
{
"id": "resp_abc123",
"object": "response",
"status": "completed",
"model": "hermes-agent",
"output": [
{"type": "function_call", "name": "terminal", "arguments": "{\"command\": \"ls\"}", "call_id": "call_1"},
{"type": "function_call_output", "call_id": "call_1", "output": "README.md src/ tests/"},
{"type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "Your project has..."}]}
],
"usage": {"input_tokens": 50, "output_tokens": 200, "total_tokens": 250}
}
```
#### Multi-turn with previous_response_id
Chain responses to maintain full context (including tool calls) across turns:
```json
{
"input": "Now show me the README",
"previous_response_id": "resp_abc123"
}
```
The server reconstructs the full conversation from the stored response chain — all previous tool calls and results are preserved.
#### Named conversations
Use the `conversation` parameter instead of tracking response IDs:
```json
{"input": "Hello", "conversation": "my-project"}
{"input": "What's in src/?", "conversation": "my-project"}
{"input": "Run the tests", "conversation": "my-project"}
```
The server automatically chains to the latest response in that conversation. Like the `/title` command for gateway sessions.
### GET /v1/responses/{id}
Retrieve a previously stored response by ID.
### DELETE /v1/responses/{id}
Delete a stored response.
### GET /v1/models
Lists `hermes-agent` as an available model. Required by most frontends for model discovery.
### GET /health
Health check. Returns `{"status": "ok"}`.
## System Prompt Handling
When a frontend sends a `system` message (Chat Completions) or `instructions` field (Responses API), hermes-agent **layers it on top** of its core system prompt. Your agent keeps all its tools, memory, and skills — the frontend's system prompt adds extra instructions.
This means you can customize behavior per-frontend without losing capabilities:
- Open WebUI system prompt: "You are a Python expert. Always include type hints."
- The agent still has terminal, file tools, web search, memory, etc.
## Authentication
Bearer token auth via the `Authorization` header:
```
Authorization: Bearer your-secret-key
```
Configure the key via `API_SERVER_KEY` env var. If no key is set, all requests are allowed (for local-only use).
## Configuration
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `API_SERVER_ENABLED` | `false` | Enable the API server |
| `API_SERVER_PORT` | `8642` | HTTP server port |
| `API_SERVER_HOST` | `127.0.0.1` | Bind address (localhost only by default) |
| `API_SERVER_KEY` | _(none)_ | Bearer token for auth |
### config.yaml
```yaml
# Not yet supported — use environment variables.
# config.yaml support coming in a future release.
```
## CORS
The API server includes CORS headers on all responses (`Access-Control-Allow-Origin: *`), so browser-based frontends can connect directly.
## Compatible Frontends
Any frontend that supports the OpenAI API format works. Tested/documented integrations:
| Frontend | Stars | Connection |
|----------|-------|------------|
| [Open WebUI](/docs/user-guide/messaging/open-webui) | 126k | Full guide available |
| LobeChat | 73k | Custom provider endpoint |
| LibreChat | 34k | Custom endpoint in librechat.yaml |
| AnythingLLM | 56k | Generic OpenAI provider |
| NextChat | 87k | BASE_URL env var |
| ChatBox | 39k | API Host setting |
| Jan | 26k | Remote model config |
| HF Chat-UI | 8k | OPENAI_BASE_URL |
| big-AGI | 7k | Custom endpoint |
| OpenAI Python SDK | — | `OpenAI(base_url="http://localhost:8642/v1")` |
| curl | — | Direct HTTP requests |
## Limitations
- **Response storage is in-memory** — stored responses (for `previous_response_id`) are lost on gateway restart. Max 100 stored responses (LRU eviction).
- **No file upload** — vision/document analysis via uploaded files is not yet supported through the API.
- **Model field is cosmetic** — the `model` field in requests is accepted but the actual LLM model used is configured server-side in config.yaml.

View File

@@ -0,0 +1,210 @@
---
sidebar_position: 15
title: "Streaming"
description: "Token-by-token live response display across all platforms"
---
# Streaming Responses
When enabled, hermes-agent streams LLM responses token-by-token instead of waiting for the full generation. Users see the response typing out live — the same experience as ChatGPT, Claude, or Gemini.
Streaming is **disabled by default** and can be enabled globally or per-platform.
## How It Works
```
LLM generates tokens → callback fires per token → queue → consumer displays
Telegram/Discord/Slack:
Token arrives → Accumulate → Every 1.5s, edit the message with new text + ▌ cursor
Done → Final edit removes cursor
API Server:
Token arrives → SSE event sent to client immediately
Done → finish chunk + [DONE]
```
The agent's internal operation doesn't change — tools still execute normally, memory and skills work as before. Streaming only affects how the **final text response** is delivered to the user.
## Enable Streaming
### Option 1: Environment variable
```bash
# Enable for all platforms
export HERMES_STREAMING_ENABLED=true
hermes gateway
```
### Option 2: config.yaml
```yaml
streaming:
enabled: true # Master switch
```
### Option 3: Per-platform
```yaml
streaming:
enabled: false # Off by default
telegram: true # But on for Telegram
discord: true # And Discord
api_server: true # And the API server
```
## Platform Support
| Platform | Streaming Method | Rate Limit | Notes |
|----------|-----------------|------------|-------|
| **Telegram** | Progressive message editing | ~20 edits/min | 1.5s edit interval, ▌ cursor |
| **Discord** | Progressive message editing | 5 edits/5s | 1.5s edit interval |
| **Slack** | Progressive message editing | ~50 calls/min | 1.5s edit interval |
| **API Server** | SSE (Server-Sent Events) | No limit | Real token-by-token events |
| **WhatsApp** | ❌ Not supported | — | No message editing API |
| **Home Assistant** | ❌ Not supported | — | No message editing API |
| **CLI** | ❌ Not yet implemented | — | KawaiiSpinner provides feedback |
Platforms without message editing support automatically fall back to non-streaming (the response appears all at once, as before).
## What Users See
### Telegram/Discord/Slack
1. Agent starts working (typing indicator shows)
2. After ~20 tokens, a message appears with partial text and a ▌ cursor
3. Every 1.5 seconds, the message is edited with more accumulated text
4. When the response is complete, the cursor disappears
Tool progress messages still work alongside streaming — tool names/previews appear as before, and the streamed response is shown in a separate message.
### API Server (frontends like Open WebUI)
When `stream: true` is set in the request, the API server returns Server-Sent Events:
```
data: {"choices":[{"delta":{"role":"assistant"}}]}
data: {"choices":[{"delta":{"content":"Here"}}]}
data: {"choices":[{"delta":{"content":" is"}}]}
data: {"choices":[{"delta":{"content":" the"}}]}
data: {"choices":[{"delta":{"content":" answer"}}]}
data: {"choices":[{"delta":{},"finish_reason":"stop"}]}
data: [DONE]
```
Frontends like Open WebUI display this as live typing.
## How It Works Internally
### Architecture
```
┌─────────────┐ stream_callback(delta) ┌──────────────────┐
│ LLM API │ ──────────────────────────► │ queue.Queue() │
│ (stream) │ (runs in agent thread) │ (thread-safe) │
└─────────────┘ └────────┬─────────┘
┌──────────────┼──────────┐
│ │ │
┌─────▼─────┐ ┌─────▼────┐ ┌──▼──────┐
│ Gateway │ │ API Svr │ │ CLI │
│ edit msg │ │ SSE evt │ │ (TODO) │
└───────────┘ └──────────┘ └─────────┘
```
1. `AIAgent.__init__` accepts an optional `stream_callback` function
2. When set, `_interruptible_api_call()` routes to `_run_streaming_chat_completion()` instead of the normal non-streaming path
3. The streaming method calls the OpenAI API with `stream=True`, iterates chunks, and calls `stream_callback(delta_text)` for each text token
4. Tool call deltas are accumulated silently (no streaming for tool arguments)
5. When the stream ends, `stream_callback(None)` signals completion
6. The method returns a fake response object compatible with the existing code path
7. If streaming fails for any reason, it falls back to a normal non-streaming API call
### Thread Safety
The agent runs in a background thread (via `_interruptible_api_call`). The consumer (gateway async task, API server SSE writer) runs in the main event loop. A `queue.Queue` bridges them — it's thread-safe by design.
### Graceful Fallback
If the LLM provider doesn't support `stream=True` or the streaming connection fails, the agent automatically falls back to a non-streaming API call. The user gets the response normally, just without the live typing effect. No error is shown.
## Configuration Reference
```yaml
streaming:
enabled: false # Master switch (default: off)
# Per-platform overrides (optional):
telegram: true # Enable for Telegram
discord: true # Enable for Discord
slack: true # Enable for Slack
api_server: true # Enable for API server
# Tuning (optional):
edit_interval: 1.5 # Seconds between message edits (default: 1.5)
min_tokens: 20 # Tokens before first display (default: 20)
```
| Variable | Default | Description |
|----------|---------|-------------|
| `HERMES_STREAMING_ENABLED` | `false` | Master switch via env var |
| `streaming.enabled` | `false` | Master switch via config |
| `streaming.<platform>` | _(unset)_ | Per-platform override |
| `streaming.edit_interval` | `1.5` | Seconds between Telegram/Discord edits |
| `streaming.min_tokens` | `20` | Minimum tokens before first message |
## Interaction with Other Features
### Tool Execution
When the agent calls tools (terminal, file operations, web search, etc.), no text tokens are generated — tool arguments are accumulated silently. Tool progress messages continue to work as before. After tools finish, the next LLM call may produce the final text response, which streams normally.
### Context Compression
Compression happens between API calls, not during streaming. No interaction.
### Interrupts
If the user sends a new message while streaming, the agent is interrupted. The HTTP connection is closed (stopping token generation), accumulated text is shown as-is, and the new message is processed.
### Prompt Caching
Streaming doesn't affect prompt caching — the request is identical, just with `stream=True` added.
### Responses API (Codex)
The Codex/Responses API streaming path also supports the `stream_callback`. Token deltas from `response.output_text.delta` events are emitted via the callback.
## Troubleshooting
### Streaming isn't working
1. Check the config: `streaming.enabled: true` in config.yaml or `HERMES_STREAMING_ENABLED=true`
2. Check per-platform: `streaming.telegram: true` overrides the master switch
3. Restart the gateway after changing config
4. Check logs for "Streaming failed, falling back" — indicates the provider may not support streaming
### Response appears twice
If you see the response both in a progressively-edited message AND as a separate final message, this is a bug. The streaming system should suppress the normal send when tokens were delivered via streaming. Please file an issue.
### Messages update too slowly
The default edit interval is 1.5 seconds (to respect platform rate limits). You can lower it in config:
```yaml
streaming:
edit_interval: 1.0 # Faster updates (may hit rate limits)
```
Going below 1.0s risks Telegram rate limiting (429 errors).
### No streaming on WhatsApp/HomeAssistant
These platforms don't support message editing, so streaming automatically falls back to non-streaming. This is expected behavior.

View File

@@ -1,12 +1,12 @@
---
sidebar_position: 1
title: "Messaging Gateway"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, or Signal — architecture and setup overview"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, or any OpenAI-compatible frontend — architecture and setup overview"
---
# Messaging Gateway
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, or Signal. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, or any OpenAI-compatible frontend (Open WebUI, LobeChat, etc.). The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
## Architecture
@@ -15,10 +15,10 @@ Chat with Hermes from Telegram, Discord, Slack, WhatsApp, or Signal. The gateway
│ Hermes Gateway │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ Telegram │ │ Discord │ │ WhatsApp │ │ Slack │ │ Signal │ │
│ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ┌──────────┐
│ │ Telegram │ │ Discord │ │ WhatsApp │ │ Slack │ │ Signal │ │API Server│ │
│ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter│ │ (OpenAI) │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘ └────┬─────┘
│ │ │ │ │ │ │
│ └─────────────┼────────────┼─────────────┼───────────┘ │
│ │ │

View File

@@ -0,0 +1,213 @@
---
sidebar_position: 8
title: "Open WebUI"
description: "Connect Open WebUI to Hermes Agent via the OpenAI-compatible API server"
---
# Open WebUI Integration
[Open WebUI](https://github.com/open-webui/open-webui) (126k★) is the most popular self-hosted chat interface for AI. With Hermes Agent's built-in API server, you can use Open WebUI as a polished web frontend for your agent — complete with conversation management, user accounts, and a modern chat interface.
## Architecture
```
┌──────────────────┐ POST /v1/chat/completions ┌──────────────────────┐
│ Open WebUI │ ──────────────────────────────► │ hermes-agent │
│ (browser UI) │ SSE streaming response │ gateway API server │
│ port 3000 │ ◄────────────────────────────── │ port 8642 │
└──────────────────┘ └──────────────────────┘
```
Open WebUI connects to Hermes Agent's API server just like it would connect to OpenAI. Your agent handles the requests with its full toolset — terminal, file operations, web search, memory, skills — and returns the final response.
## Quick Setup
### 1. Enable the API server
Add to `~/.hermes/.env`:
```bash
API_SERVER_ENABLED=true
# Optional: set a key for auth (recommended if accessible beyond localhost)
# API_SERVER_KEY=your-secret-key
```
### 2. Start Hermes Agent gateway
```bash
hermes gateway
```
You should see:
```
[API Server] API server listening on http://127.0.0.1:8642
```
### 3. Start Open WebUI
```bash
docker run -d -p 3000:8080 \
-e OPENAI_API_BASE_URL=http://host.docker.internal:8642/v1 \
-e OPENAI_API_KEY=not-needed \
--add-host=host.docker.internal:host-gateway \
-v open-webui:/app/backend/data \
--name open-webui \
--restart always \
ghcr.io/open-webui/open-webui:main
```
If you set an `API_SERVER_KEY`, use it instead of `not-needed`:
```bash
-e OPENAI_API_KEY=your-secret-key
```
### 4. Open the UI
Go to **http://localhost:3000**. Create your admin account (the first user becomes admin). You should see **hermes-agent** in the model dropdown. Start chatting!
## Docker Compose Setup
For a more permanent setup, create a `docker-compose.yml`:
```yaml
services:
open-webui:
image: ghcr.io/open-webui/open-webui:main
ports:
- "3000:8080"
volumes:
- open-webui:/app/backend/data
environment:
- OPENAI_API_BASE_URL=http://host.docker.internal:8642/v1
- OPENAI_API_KEY=not-needed
extra_hosts:
- "host.docker.internal:host-gateway"
restart: always
volumes:
open-webui:
```
Then:
```bash
docker compose up -d
```
## Configuring via the Admin UI
If you prefer to configure the connection through the UI instead of environment variables:
1. Log in to Open WebUI at **http://localhost:3000**
2. Click your **profile avatar****Admin Settings**
3. Go to **Connections**
4. Under **OpenAI API**, click the **wrench icon** (Manage)
5. Click **+ Add New Connection**
6. Enter:
- **URL**: `http://host.docker.internal:8642/v1`
- **API Key**: your key or any non-empty value (e.g., `not-needed`)
7. Click the **checkmark** to verify the connection
8. **Save**
The **hermes-agent** model should now appear in the model dropdown.
:::warning
Environment variables only take effect on Open WebUI's **first launch**. After that, connection settings are stored in its internal database. To change them later, use the Admin UI or delete the Docker volume and start fresh.
:::
## API Type: Chat Completions vs Responses
Open WebUI supports two API modes when connecting to a backend:
| Mode | Format | When to use |
|------|--------|-------------|
| **Chat Completions** (default) | `/v1/chat/completions` | Recommended. Works out of the box. |
| **Responses** (experimental) | `/v1/responses` | For server-side conversation state via `previous_response_id`. |
### Using Chat Completions (recommended)
This is the default and requires no extra configuration. Open WebUI sends standard OpenAI-format requests and Hermes Agent responds accordingly. Each request includes the full conversation history.
### Using Responses API
To use the Responses API mode:
1. Go to **Admin Settings****Connections****OpenAI****Manage**
2. Edit your hermes-agent connection
3. Change **API Type** from "Chat Completions" to **"Responses (Experimental)"**
4. Save
With the Responses API, Open WebUI sends requests in the Responses format (`input` array + `instructions`), and Hermes Agent can preserve full tool call history across turns via `previous_response_id`.
:::note
Open WebUI currently manages conversation history client-side even in Responses mode — it sends the full message history in each request rather than using `previous_response_id`. The Responses API mode is mainly useful for future compatibility as frontends evolve.
:::
## How It Works
When you send a message in Open WebUI:
1. Open WebUI sends a `POST /v1/chat/completions` request with your message and conversation history
2. Hermes Agent creates an AIAgent instance with its full toolset
3. The agent processes your request — it may call tools (terminal, file operations, web search, etc.)
4. Tool calls happen invisibly server-side
5. The agent's final text response is returned to Open WebUI
6. Open WebUI displays the response in its chat interface
Your agent has access to all the same tools and capabilities as when using the CLI or Telegram — the only difference is the frontend.
## Configuration Reference
### Hermes Agent (API server)
| Variable | Default | Description |
|----------|---------|-------------|
| `API_SERVER_ENABLED` | `false` | Enable the API server |
| `API_SERVER_PORT` | `8642` | HTTP server port |
| `API_SERVER_HOST` | `127.0.0.1` | Bind address |
| `API_SERVER_KEY` | _(none)_ | Bearer token for auth. No key = allow all. |
### Open WebUI
| Variable | Description |
|----------|-------------|
| `OPENAI_API_BASE_URL` | Hermes Agent's API URL (include `/v1`) |
| `OPENAI_API_KEY` | Must be non-empty. Match your `API_SERVER_KEY`. |
## Troubleshooting
### No models appear in the dropdown
- **Check the URL has `/v1` suffix**: `http://host.docker.internal:8642/v1` (not just `:8642`)
- **Verify the gateway is running**: `curl http://localhost:8642/health` should return `{"status": "ok"}`
- **Check model listing**: `curl http://localhost:8642/v1/models` should return a list with `hermes-agent`
- **Docker networking**: From inside Docker, `localhost` means the container, not your host. Use `host.docker.internal` or `--network=host`.
### Connection test passes but no models load
This is almost always the missing `/v1` suffix. Open WebUI's connection test is a basic connectivity check — it doesn't verify model listing works.
### Response takes a long time
Hermes Agent may be executing multiple tool calls (reading files, running commands, searching the web) before producing its final response. This is normal for complex queries. The response appears all at once when the agent finishes.
### "Invalid API key" errors
Make sure your `OPENAI_API_KEY` in Open WebUI matches the `API_SERVER_KEY` in Hermes Agent. If no key is configured on the Hermes side, any non-empty value works.
## Linux Docker (no Docker Desktop)
On Linux without Docker Desktop, `host.docker.internal` doesn't resolve by default. Options:
```bash
# Option 1: Add host mapping
docker run --add-host=host.docker.internal:host-gateway ...
# Option 2: Use host networking
docker run --network=host -e OPENAI_API_BASE_URL=http://localhost:8642/v1 ...
# Option 3: Use Docker bridge IP
docker run -e OPENAI_API_BASE_URL=http://172.17.0.1:8642/v1 ...
```