Compare commits

...

10 Commits

Author SHA1 Message Date
teknium1
79b3d36ba8 docs: add reply threading mode section to Telegram docs 2026-03-11 09:16:11 -07:00
Raul
1334d5f014 feat(gateway): Telegram reply threading modes (off/first/all)
Add configurable reply_to_mode for Telegram multi-chunk replies:
- off: never thread replies to original message
- first: only first chunk threads (default, preserves current behavior)
- all: all chunks thread to original message

Configurable via reply_to_mode in platform config or TELEGRAM_REPLY_TO_MODE
env var.

Cherry-picked from PR #855 by raulvidis, rebased onto current main.
Dropped asyncio_mode=auto pyproject.toml change, added @pytest.mark.asyncio
decorators, fixed test IDs to use numeric strings.

Co-authored-by: Raul <77628552+raulvidis@users.noreply.github.com>
2026-03-11 09:14:41 -07:00
teknium1
b800e63137 fix: clean up API server — remove dead code, deduplicate model resolution, cache streaming config, add setup integration and security docs
- Remove unused _write_sse_chat_completion pseudo-streaming method (dead code)
- Extract _resolve_model() helper in gateway/run.py, use from api_server
- Cache streaming config at GatewayRunner init instead of YAML parsing per-message
- Add API_SERVER_* env vars to OPTIONAL_ENV_VARS for hermes setup integration
- Add security warning about network exposure without API_SERVER_KEY
2026-03-11 09:01:17 -07:00
teknium1
d54280ea03 docs: comprehensive documentation for API server, streaming, and Open WebUI
Cherry-picked from PR #828, resolved conflicts with main.
2026-03-11 08:57:35 -07:00
teknium1
95d221c31c feat: add streaming LLM response support across all platforms
Cherry-picked from PR #828, resolved conflicts with main.
2026-03-11 08:56:37 -07:00
teknium1
b2a4092783 docs: add Open WebUI integration guide
Cherry-picked from PR #828.
2026-03-11 08:54:12 -07:00
teknium1
b3c798d1b6 feat: add pseudo-streaming SSE + conversation parameter
Cherry-picked from PR #828.
2026-03-11 08:54:07 -07:00
teknium1
7ae208bfee feat: add conversation parameter + named session chaining
Cherry-picked from PR #828.
2026-03-11 08:54:00 -07:00
teknium1
7d771c2b1b feat: enhance Responses API — retrieval, deletion, tool calls, usage, CORS
Cherry-picked from PR #828.
2026-03-11 08:53:54 -07:00
teknium1
58dc5c4af1 feat: add OpenAI-compatible API server platform adapter (Phase 1)
Cherry-picked from PR #828, rebased onto current main with conflict resolution.
2026-03-11 08:53:47 -07:00
15 changed files with 3718 additions and 25 deletions

View File

@@ -219,6 +219,22 @@ compression:
# Options: "auto", "openrouter", "nous", "main"
# summary_provider: "auto"
# =============================================================================
# Streaming (live token-by-token response display)
# =============================================================================
# When enabled, LLM responses stream token-by-token instead of appearing
# all at once. Supported on Telegram, Discord, Slack (via message editing)
# and the API server (via SSE). Disabled by default.
#
# streaming:
# enabled: false # Master switch (default: off)
# # Per-platform overrides:
# # telegram: true
# # discord: true
# # api_server: true
# # edit_interval: 1.5 # Seconds between message edits (default: 1.5)
# # min_tokens: 20 # Tokens before first display (default: 20)
# =============================================================================
# Auxiliary Models (Advanced — Experimental)
# =============================================================================

View File

@@ -29,6 +29,7 @@ class Platform(Enum):
SIGNAL = "signal"
HOMEASSISTANT = "homeassistant"
EMAIL = "email"
API_SERVER = "api_server"
@dataclass
@@ -98,6 +99,12 @@ class PlatformConfig:
api_key: Optional[str] = None # API key if different from token
home_channel: Optional[HomeChannel] = None
# Reply threading mode (Telegram/Slack)
# - "off": Never thread replies to original message
# - "first": Only first chunk threads to user's message (default)
# - "all": All chunks in multi-part replies thread to user's message
reply_to_mode: str = "first"
# Platform-specific settings
extra: Dict[str, Any] = field(default_factory=dict)
@@ -105,6 +112,7 @@ class PlatformConfig:
result = {
"enabled": self.enabled,
"extra": self.extra,
"reply_to_mode": self.reply_to_mode,
}
if self.token:
result["token"] = self.token
@@ -125,6 +133,7 @@ class PlatformConfig:
token=data.get("token"),
api_key=data.get("api_key"),
home_channel=home_channel,
reply_to_mode=data.get("reply_to_mode", "first"),
extra=data.get("extra", {}),
)
@@ -171,6 +180,9 @@ class GatewayConfig:
# Email uses extra dict for config (address + imap_host + smtp_host)
elif platform == Platform.EMAIL and config.extra.get("address"):
connected.append(platform)
# API Server uses enabled flag only (no token needed)
elif platform == Platform.API_SERVER:
connected.append(platform)
return connected
def get_home_channel(self, platform: Platform) -> Optional[HomeChannel]:
@@ -346,6 +358,11 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
config.platforms[Platform.TELEGRAM].enabled = True
config.platforms[Platform.TELEGRAM].token = telegram_token
# Reply threading mode for Telegram (off/first/all)
telegram_reply_mode = os.getenv("TELEGRAM_REPLY_TO_MODE", "").lower()
if telegram_reply_mode in ("off", "first", "all"):
config.platforms[Platform.TELEGRAM].reply_to_mode = telegram_reply_mode
telegram_home = os.getenv("TELEGRAM_HOME_CHANNEL")
if telegram_home and Platform.TELEGRAM in config.platforms:
config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
@@ -446,6 +463,25 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
name=os.getenv("EMAIL_HOME_ADDRESS_NAME", "Home"),
)
# API Server
api_server_enabled = os.getenv("API_SERVER_ENABLED", "").lower() in ("true", "1", "yes")
api_server_key = os.getenv("API_SERVER_KEY", "")
api_server_port = os.getenv("API_SERVER_PORT")
api_server_host = os.getenv("API_SERVER_HOST")
if api_server_enabled or api_server_key:
if Platform.API_SERVER not in config.platforms:
config.platforms[Platform.API_SERVER] = PlatformConfig()
config.platforms[Platform.API_SERVER].enabled = True
if api_server_key:
config.platforms[Platform.API_SERVER].extra["key"] = api_server_key
if api_server_port:
try:
config.platforms[Platform.API_SERVER].extra["port"] = int(api_server_port)
except ValueError:
pass
if api_server_host:
config.platforms[Platform.API_SERVER].extra["host"] = api_server_host
# Session settings
idle_minutes = os.getenv("SESSION_IDLE_MINUTES")
if idle_minutes:

View File

@@ -0,0 +1,783 @@
"""
OpenAI-compatible API server platform adapter.
Exposes an HTTP server with endpoints:
- POST /v1/chat/completions — OpenAI Chat Completions format (stateless)
- POST /v1/responses — OpenAI Responses API format (stateful via previous_response_id)
- GET /v1/responses/{response_id} — Retrieve a stored response
- DELETE /v1/responses/{response_id} — Delete a stored response
- GET /v1/models — lists hermes-agent as an available model
- GET /health — health check
Any OpenAI-compatible frontend (Open WebUI, LobeChat, etc.) can connect
to hermes-agent through this adapter.
Requires:
- aiohttp (already available in the gateway)
"""
import asyncio
import collections
import json
import logging
import os
import time
import uuid
from functools import wraps
from typing import Any, Dict, List, Optional
try:
import aiohttp
from aiohttp import web
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
aiohttp = None # type: ignore[assignment]
web = None # type: ignore[assignment]
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
)
logger = logging.getLogger(__name__)
# Default settings
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 8642
MAX_STORED_RESPONSES = 100
def check_api_server_requirements() -> bool:
"""Check if API server dependencies are available."""
return AIOHTTP_AVAILABLE
class ResponseStore:
"""
In-memory LRU store for Responses API state.
Each stored response includes the full internal conversation history
(with tool calls and results) so it can be reconstructed on subsequent
requests via previous_response_id.
"""
def __init__(self, max_size: int = MAX_STORED_RESPONSES):
self._store: collections.OrderedDict[str, Dict[str, Any]] = collections.OrderedDict()
self._max_size = max_size
def get(self, response_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve a stored response by ID (moves to end for LRU)."""
if response_id in self._store:
self._store.move_to_end(response_id)
return self._store[response_id]
return None
def put(self, response_id: str, data: Dict[str, Any]) -> None:
"""Store a response, evicting the oldest if at capacity."""
if response_id in self._store:
self._store.move_to_end(response_id)
self._store[response_id] = data
while len(self._store) > self._max_size:
self._store.popitem(last=False)
def delete(self, response_id: str) -> bool:
"""Remove a response from the store. Returns True if found and deleted."""
if response_id in self._store:
del self._store[response_id]
return True
return False
def __len__(self) -> int:
return len(self._store)
# ---------------------------------------------------------------------------
# CORS middleware
# ---------------------------------------------------------------------------
_CORS_HEADERS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Authorization, Content-Type",
}
if AIOHTTP_AVAILABLE:
@web.middleware
async def cors_middleware(request, handler):
"""Add CORS headers to every response; handle OPTIONS preflight."""
if request.method == "OPTIONS":
return web.Response(status=200, headers=_CORS_HEADERS)
response = await handler(request)
response.headers.update(_CORS_HEADERS)
return response
else:
cors_middleware = None # type: ignore[assignment]
class APIServerAdapter(BasePlatformAdapter):
"""
OpenAI-compatible HTTP API server adapter.
Runs an aiohttp web server that accepts OpenAI-format requests
and routes them through hermes-agent's AIAgent.
"""
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.API_SERVER)
extra = config.extra or {}
self._host: str = extra.get("host", os.getenv("API_SERVER_HOST", DEFAULT_HOST))
self._port: int = int(extra.get("port", os.getenv("API_SERVER_PORT", str(DEFAULT_PORT))))
self._api_key: str = extra.get("key", os.getenv("API_SERVER_KEY", ""))
self._app: Optional["web.Application"] = None
self._runner: Optional["web.AppRunner"] = None
self._site: Optional["web.TCPSite"] = None
self._response_store = ResponseStore()
# Conversation name → latest response_id mapping
self._conversations: Dict[str, str] = {}
# ------------------------------------------------------------------
# Auth helper
# ------------------------------------------------------------------
def _check_auth(self, request: "web.Request") -> Optional["web.Response"]:
"""
Validate Bearer token from Authorization header.
Returns None if auth is OK, or a 401 web.Response on failure.
If no API key is configured, all requests are allowed.
"""
if not self._api_key:
return None # No key configured — allow all (local-only use)
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:].strip()
if token == self._api_key:
return None # Auth OK
return web.json_response(
{"error": {"message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key"}},
status=401,
)
# ------------------------------------------------------------------
# Agent creation helper
# ------------------------------------------------------------------
def _create_agent(
self,
ephemeral_system_prompt: Optional[str] = None,
session_id: Optional[str] = None,
stream_callback=None,
) -> Any:
"""
Create an AIAgent instance using the gateway's runtime config.
Uses _resolve_runtime_agent_kwargs() to pick up model, api_key,
base_url, etc. from config.yaml / env vars.
"""
from run_agent import AIAgent
from gateway.run import _resolve_runtime_agent_kwargs, _resolve_model
runtime_kwargs = _resolve_runtime_agent_kwargs()
model = _resolve_model()
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90"))
agent = AIAgent(
model=model,
**runtime_kwargs,
max_iterations=max_iterations,
quiet_mode=True,
verbose_logging=False,
ephemeral_system_prompt=ephemeral_system_prompt or None,
session_id=session_id,
platform="api_server",
stream_callback=stream_callback,
)
return agent
# ------------------------------------------------------------------
# HTTP Handlers
# ------------------------------------------------------------------
async def _handle_health(self, request: "web.Request") -> "web.Response":
"""GET /health — simple health check."""
return web.json_response({"status": "ok", "platform": "hermes-agent"})
async def _handle_models(self, request: "web.Request") -> "web.Response":
"""GET /v1/models — return hermes-agent as an available model."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
return web.json_response({
"object": "list",
"data": [
{
"id": "hermes-agent",
"object": "model",
"created": int(time.time()),
"owned_by": "hermes",
"permission": [],
"root": "hermes-agent",
"parent": None,
}
],
})
async def _handle_chat_completions(self, request: "web.Request") -> "web.Response":
"""POST /v1/chat/completions — OpenAI Chat Completions format."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
# Parse request body
try:
body = await request.json()
except (json.JSONDecodeError, Exception):
return web.json_response(
{"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}},
status=400,
)
messages = body.get("messages")
if not messages or not isinstance(messages, list):
return web.json_response(
{"error": {"message": "Missing or invalid 'messages' field", "type": "invalid_request_error"}},
status=400,
)
stream = body.get("stream", False)
# Extract system message (becomes ephemeral system prompt layered ON TOP of core)
system_prompt = None
conversation_messages: List[Dict[str, str]] = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
if role == "system":
# Accumulate system messages
if system_prompt is None:
system_prompt = content
else:
system_prompt = system_prompt + "\n" + content
elif role in ("user", "assistant"):
conversation_messages.append({"role": role, "content": content})
# Extract the last user message as the primary input
user_message = ""
history = []
if conversation_messages:
user_message = conversation_messages[-1].get("content", "")
history = conversation_messages[:-1]
if not user_message:
return web.json_response(
{"error": {"message": "No user message found in messages", "type": "invalid_request_error"}},
status=400,
)
session_id = str(uuid.uuid4())
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
model_name = body.get("model", "hermes-agent")
created = int(time.time())
if stream:
import queue as _q
_stream_q = _q.Queue()
def _on_api_token(delta):
_stream_q.put(delta) # None = done
# Start agent in background
agent_task = asyncio.ensure_future(self._run_agent(
user_message=user_message,
conversation_history=history,
ephemeral_system_prompt=system_prompt,
session_id=session_id,
stream_callback=_on_api_token,
))
return await self._write_real_sse_chat_completion(
request, completion_id, model_name, created, _stream_q, agent_task
)
# Non-streaming: run the agent and return full response
try:
result, usage = await self._run_agent(
user_message=user_message,
conversation_history=history,
ephemeral_system_prompt=system_prompt,
session_id=session_id,
)
except Exception as e:
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
return web.json_response(
{"error": {"message": f"Internal server error: {e}", "type": "server_error"}},
status=500,
)
final_response = result.get("final_response", "")
if not final_response:
final_response = result.get("error", "(No response generated)")
response_data = {
"id": completion_id,
"object": "chat.completion",
"created": created,
"model": model_name,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": final_response,
},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
return web.json_response(response_data)
async def _write_real_sse_chat_completion(
self, request: "web.Request", completion_id: str, model: str,
created: int, stream_q, agent_task,
) -> "web.StreamResponse":
"""Write real streaming SSE from agent's stream_callback queue."""
import queue as _q
response = web.StreamResponse(
status=200,
headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"},
)
await response.prepare(request)
# Role chunk
role_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
}
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
# Stream content chunks as they arrive from the agent
loop = asyncio.get_event_loop()
while True:
try:
delta = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5))
except _q.Empty:
if agent_task.done():
break
continue
if delta is None: # End of stream
break
content_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"content": delta}, "finish_reason": None}],
}
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
# Get usage from completed agent
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
try:
result, agent_usage = await agent_task
usage = agent_usage or usage
except Exception:
pass
# Finish chunk
finish_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
"usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
await response.write(f"data: {json.dumps(finish_chunk)}\n\n".encode())
await response.write(b"data: [DONE]\n\n")
return response
async def _handle_responses(self, request: "web.Request") -> "web.Response":
"""POST /v1/responses — OpenAI Responses API format."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
# Parse request body
try:
body = await request.json()
except (json.JSONDecodeError, Exception):
return web.json_response(
{"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}},
status=400,
)
raw_input = body.get("input")
if raw_input is None:
return web.json_response(
{"error": {"message": "Missing 'input' field", "type": "invalid_request_error"}},
status=400,
)
instructions = body.get("instructions")
previous_response_id = body.get("previous_response_id")
conversation = body.get("conversation")
store = body.get("store", True)
# conversation and previous_response_id are mutually exclusive
if conversation and previous_response_id:
return web.json_response(
{"error": {"message": "Cannot use both 'conversation' and 'previous_response_id'", "type": "invalid_request_error"}},
status=400,
)
# Resolve conversation name to latest response_id
if conversation:
previous_response_id = self._conversations.get(conversation)
# No error if conversation doesn't exist yet — it's a new conversation
# Normalize input to message list
input_messages: List[Dict[str, str]] = []
if isinstance(raw_input, str):
input_messages = [{"role": "user", "content": raw_input}]
elif isinstance(raw_input, list):
for item in raw_input:
if isinstance(item, str):
input_messages.append({"role": "user", "content": item})
elif isinstance(item, dict):
role = item.get("role", "user")
content = item.get("content", "")
# Handle content that may be a list of content parts
if isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict) and part.get("type") == "input_text":
text_parts.append(part.get("text", ""))
elif isinstance(part, dict) and part.get("type") == "output_text":
text_parts.append(part.get("text", ""))
elif isinstance(part, str):
text_parts.append(part)
content = "\n".join(text_parts)
input_messages.append({"role": role, "content": content})
else:
return web.json_response(
{"error": {"message": "'input' must be a string or array", "type": "invalid_request_error"}},
status=400,
)
# Reconstruct conversation history from previous_response_id
conversation_history: List[Dict[str, str]] = []
if previous_response_id:
stored = self._response_store.get(previous_response_id)
if stored is None:
return web.json_response(
{"error": {"message": f"Previous response not found: {previous_response_id}", "type": "invalid_request_error"}},
status=404,
)
conversation_history = list(stored.get("conversation_history", []))
# If no instructions provided, carry forward from previous
if instructions is None:
instructions = stored.get("instructions")
# Append new input messages to history (all but the last become history)
for msg in input_messages[:-1]:
conversation_history.append(msg)
# Last input message is the user_message
user_message = input_messages[-1].get("content", "") if input_messages else ""
if not user_message:
return web.json_response(
{"error": {"message": "No user message found in input", "type": "invalid_request_error"}},
status=400,
)
# Truncation support
if body.get("truncation") == "auto" and len(conversation_history) > 100:
conversation_history = conversation_history[-100:]
# Run the agent
session_id = str(uuid.uuid4())
try:
result, usage = await self._run_agent(
user_message=user_message,
conversation_history=conversation_history,
ephemeral_system_prompt=instructions,
session_id=session_id,
)
except Exception as e:
logger.error("Error running agent for responses: %s", e, exc_info=True)
return web.json_response(
{"error": {"message": f"Internal server error: {e}", "type": "server_error"}},
status=500,
)
final_response = result.get("final_response", "")
if not final_response:
final_response = result.get("error", "(No response generated)")
response_id = f"resp_{uuid.uuid4().hex[:28]}"
created_at = int(time.time())
# Build the full conversation history for storage
# (includes tool calls from the agent run)
full_history = list(conversation_history)
full_history.append({"role": "user", "content": user_message})
# Add agent's internal messages if available
agent_messages = result.get("messages", [])
if agent_messages:
full_history.extend(agent_messages)
else:
full_history.append({"role": "assistant", "content": final_response})
# Build output items (includes tool calls + final message)
output_items = self._extract_output_items(result)
response_data = {
"id": response_id,
"object": "response",
"status": "completed",
"created_at": created_at,
"model": body.get("model", "hermes-agent"),
"output": output_items,
"usage": {
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
# Store the complete response object for future chaining / GET retrieval
if store:
self._response_store.put(response_id, {
"response": response_data,
"conversation_history": full_history,
"instructions": instructions,
})
# Update conversation mapping so the next request with the same
# conversation name automatically chains to this response
if conversation:
self._conversations[conversation] = response_id
return web.json_response(response_data)
# ------------------------------------------------------------------
# Agent execution
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# GET / DELETE response endpoints
# ------------------------------------------------------------------
async def _handle_get_response(self, request: "web.Request") -> "web.Response":
"""GET /v1/responses/{response_id} — retrieve a stored response."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
response_id = request.match_info["response_id"]
stored = self._response_store.get(response_id)
if stored is None:
return web.json_response(
{"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}},
status=404,
)
return web.json_response(stored["response"])
async def _handle_delete_response(self, request: "web.Request") -> "web.Response":
"""DELETE /v1/responses/{response_id} — delete a stored response."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
response_id = request.match_info["response_id"]
deleted = self._response_store.delete(response_id)
if not deleted:
return web.json_response(
{"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}},
status=404,
)
return web.json_response({
"id": response_id,
"object": "response",
"deleted": True,
})
# ------------------------------------------------------------------
# Output extraction helper
# ------------------------------------------------------------------
@staticmethod
def _extract_output_items(result: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Build the full output item array from the agent's messages.
Walks *result["messages"]* and emits:
- ``function_call`` items for each tool_call on assistant messages
- ``function_call_output`` items for each tool-role message
- a final ``message`` item with the assistant's text reply
"""
items: List[Dict[str, Any]] = []
messages = result.get("messages", [])
for msg in messages:
role = msg.get("role")
if role == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
func = tc.get("function", {})
items.append({
"type": "function_call",
"name": func.get("name", ""),
"arguments": func.get("arguments", ""),
"call_id": tc.get("id", ""),
})
elif role == "tool":
items.append({
"type": "function_call_output",
"call_id": msg.get("tool_call_id", ""),
"output": msg.get("content", ""),
})
# Final assistant message
final = result.get("final_response", "")
if not final:
final = result.get("error", "(No response generated)")
items.append({
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": final,
}
],
})
return items
# ------------------------------------------------------------------
# Agent execution
# ------------------------------------------------------------------
async def _run_agent(
self,
user_message: str,
conversation_history: List[Dict[str, str]],
ephemeral_system_prompt: Optional[str] = None,
session_id: Optional[str] = None,
stream_callback=None,
) -> tuple:
"""
Create an agent and run a conversation in a thread executor.
Returns ``(result_dict, usage_dict)`` where *usage_dict* contains
``input_tokens``, ``output_tokens`` and ``total_tokens``.
"""
loop = asyncio.get_event_loop()
def _run():
agent = self._create_agent(
ephemeral_system_prompt=ephemeral_system_prompt,
session_id=session_id,
stream_callback=stream_callback,
)
result = agent.run_conversation(
user_message=user_message,
conversation_history=conversation_history,
)
usage = {
"input_tokens": getattr(agent, "session_prompt_tokens", 0) or 0,
"output_tokens": getattr(agent, "session_completion_tokens", 0) or 0,
"total_tokens": getattr(agent, "session_total_tokens", 0) or 0,
}
return result, usage
return await loop.run_in_executor(None, _run)
# ------------------------------------------------------------------
# BasePlatformAdapter interface
# ------------------------------------------------------------------
async def connect(self) -> bool:
"""Start the aiohttp web server."""
if not AIOHTTP_AVAILABLE:
logger.warning("[%s] aiohttp not installed", self.name)
return False
try:
self._app = web.Application(middlewares=[cors_middleware])
self._app.router.add_get("/health", self._handle_health)
self._app.router.add_get("/v1/models", self._handle_models)
self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions)
self._app.router.add_post("/v1/responses", self._handle_responses)
self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response)
self._app.router.add_delete("/v1/responses/{response_id}", self._handle_delete_response)
self._runner = web.AppRunner(self._app)
await self._runner.setup()
self._site = web.TCPSite(self._runner, self._host, self._port)
await self._site.start()
self._running = True
logger.info(
"[%s] API server listening on http://%s:%d",
self.name, self._host, self._port,
)
return True
except Exception as e:
logger.error("[%s] Failed to start API server: %s", self.name, e)
return False
async def disconnect(self) -> None:
"""Stop the aiohttp web server."""
self._running = False
if self._site:
await self._site.stop()
self._site = None
if self._runner:
await self._runner.cleanup()
self._runner = None
self._app = None
logger.info("[%s] API server stopped", self.name)
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""
Not used — HTTP request/response cycle handles delivery directly.
"""
return SendResult(success=False, error="API server uses HTTP request/response, not send()")
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Return basic info about the API server."""
return {
"name": "API Server",
"type": "api",
"host": self._host,
"port": self._port,
}

View File

@@ -101,15 +101,17 @@ class TelegramAdapter(BasePlatformAdapter):
- Sending responses with Telegram markdown
- Forum topics (thread_id support)
- Media messages
- Reply threading modes (off/first/all)
"""
# Telegram message limits
MAX_MESSAGE_LENGTH = 4096
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.TELEGRAM)
self._app: Optional[Application] = None
self._bot: Optional[Bot] = None
self._reply_to_mode: str = getattr(config, 'reply_to_mode', 'first') or 'first'
self._delivery_progress: Dict[str, bool] = {}
async def connect(self) -> bool:
"""Connect to Telegram and start polling for updates."""
@@ -206,6 +208,29 @@ class TelegramAdapter(BasePlatformAdapter):
self._bot = None
logger.info("[%s] Disconnected from Telegram", self.name)
def _should_thread_reply(self, chat_id: str, reply_to: Optional[str], chunk_index: int) -> bool:
"""
Determine if this message chunk should thread to the original message.
Args:
chat_id: The chat ID
reply_to: The original message ID to reply to
chunk_index: Index of this chunk (0 = first chunk)
Returns:
True if this chunk should be threaded to the original message
"""
if not reply_to:
return False
mode = self._reply_to_mode
if mode == "off":
return False
elif mode == "all":
return True
else: # "first" (default)
return chunk_index == 0
async def send(
self,
chat_id: str,
@@ -218,7 +243,6 @@ class TelegramAdapter(BasePlatformAdapter):
return SendResult(success=False, error="Not connected")
try:
# Format and split message if needed
formatted = self.format_message(content)
chunks = self.truncate_message(formatted, self.MAX_MESSAGE_LENGTH)
@@ -226,31 +250,30 @@ class TelegramAdapter(BasePlatformAdapter):
thread_id = metadata.get("thread_id") if metadata else None
for i, chunk in enumerate(chunks):
# Try Markdown first, fall back to plain text if it fails
should_thread = self._should_thread_reply(chat_id, reply_to, i)
reply_to_id = int(reply_to) if should_thread else None
try:
msg = await self._bot.send_message(
chat_id=int(chat_id),
text=chunk,
parse_mode=ParseMode.MARKDOWN_V2,
reply_to_message_id=int(reply_to) if reply_to and i == 0 else None,
reply_to_message_id=reply_to_id,
message_thread_id=int(thread_id) if thread_id else None,
)
except Exception as md_error:
# Markdown parsing failed, try plain text
if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower():
logger.warning("[%s] MarkdownV2 parse failed, falling back to plain text: %s", self.name, md_error)
# Strip MDV2 escape backslashes so the user doesn't
# see raw backslashes littered through the message.
plain_chunk = _strip_mdv2(chunk)
msg = await self._bot.send_message(
chat_id=int(chat_id),
text=plain_chunk,
parse_mode=None, # Plain text
reply_to_message_id=int(reply_to) if reply_to and i == 0 else None,
parse_mode=None,
reply_to_message_id=reply_to_id,
message_thread_id=int(thread_id) if thread_id else None,
)
else:
raise # Re-raise if not a parse error
raise
message_ids.append(str(msg.message_id))
return SendResult(

View File

@@ -19,6 +19,7 @@ import os
import re
import sys
import signal
import time
import threading
from logging.handlers import RotatingFileHandler
from pathlib import Path
@@ -165,6 +166,28 @@ from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageTyp
logger = logging.getLogger(__name__)
def _resolve_model() -> str:
"""Resolve the model name from env vars and config.yaml.
Priority: HERMES_MODEL env > LLM_MODEL env > config.yaml model.default > fallback.
"""
model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
try:
import yaml
_cfg_path = Path.home() / ".hermes" / "config.yaml"
if _cfg_path.exists():
with open(_cfg_path, encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, str):
model = model_cfg
elif isinstance(model_cfg, dict):
model = model_cfg.get("default", model)
except Exception:
pass
return model
def _resolve_runtime_agent_kwargs() -> dict:
"""Resolve provider credentials for gateway-created AIAgent instances."""
from hermes_cli.runtime_provider import (
@@ -206,6 +229,7 @@ class GatewayRunner:
self._reasoning_config = self._load_reasoning_config()
self._provider_routing = self._load_provider_routing()
self._fallback_model = self._load_fallback_model()
self._streaming_config = self._load_streaming_config()
# Wire process registry into session store for reset protection
from tools.process_registry import process_registry
@@ -460,6 +484,40 @@ class GatewayRunner:
pass
return None
@staticmethod
def _load_streaming_config() -> dict:
"""Load streaming config from config.yaml at startup.
Returns a dict like {"enabled": False, "telegram": True, ...}.
Per-platform keys override the global 'enabled' flag.
The HERMES_STREAMING_ENABLED env var overrides everything.
"""
config = {"enabled": False}
try:
import yaml as _y
cfg_path = _hermes_home / "config.yaml"
if cfg_path.exists():
with open(cfg_path, encoding="utf-8") as _f:
cfg = _y.safe_load(_f) or {}
s_cfg = cfg.get("streaming", {})
if isinstance(s_cfg, dict):
config = s_cfg
except Exception:
pass
# Env var override
if os.getenv("HERMES_STREAMING_ENABLED", "").lower() in ("true", "1", "yes"):
config["enabled"] = True
return config
def _is_streaming_enabled(self, platform_key: str) -> bool:
"""Check if streaming is enabled for a given platform."""
cfg = self._streaming_config
# Per-platform override
if platform_key and cfg.get(platform_key) is not None:
return str(cfg[platform_key]).lower() in ("true", "1", "yes")
# Global default
return str(cfg.get("enabled", False)).lower() in ("true", "1", "yes")
async def start(self) -> bool:
"""
Start the gateway and all configured platform adapters.
@@ -679,6 +737,13 @@ class GatewayRunner:
return None
return EmailAdapter(config)
elif platform == Platform.API_SERVER:
from gateway.platforms.api_server import APIServerAdapter, check_api_server_requirements
if not check_api_server_requirements():
logger.warning("API Server: aiohttp not installed")
return None
return APIServerAdapter(config)
return None
def _is_user_authorized(self, source: SessionSource) -> bool:
@@ -698,6 +763,11 @@ class GatewayRunner:
if source.platform == Platform.HOMEASSISTANT:
return True
# API Server handles auth at the HTTP layer (Bearer token),
# so requests that reach the gateway runner are already authorized.
if source.platform == Platform.API_SERVER:
return True
user_id = source.user_id
if not user_id:
return False
@@ -709,6 +779,7 @@ class GatewayRunner:
Platform.SLACK: "SLACK_ALLOWED_USERS",
Platform.SIGNAL: "SIGNAL_ALLOWED_USERS",
Platform.EMAIL: "EMAIL_ALLOWED_USERS",
Platform.API_SERVER: "API_SERVER_ALLOWED_KEYS",
}
platform_allow_all_map = {
Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS",
@@ -717,6 +788,7 @@ class GatewayRunner:
Platform.SLACK: "SLACK_ALLOW_ALL_USERS",
Platform.SIGNAL: "SIGNAL_ALLOW_ALL_USERS",
Platform.EMAIL: "EMAIL_ALLOW_ALL_USERS",
Platform.API_SERVER: "API_SERVER_ALLOW_ALL",
}
# Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true)
@@ -1408,6 +1480,23 @@ class GatewayRunner:
session_entry.session_key,
last_prompt_tokens=agent_result.get("last_prompt_tokens", 0),
)
# If streaming already delivered the response via progressive edits,
# do a final edit with the post-processed text and suppress the
# normal send to avoid duplicating the message.
_streamed_id = agent_result.get("_streamed_msg_id")
if _streamed_id and response:
adapter = self.adapters.get(source.platform)
if adapter:
try:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=_streamed_id,
content=response,
)
except Exception:
pass
return "" # Suppress normal send in base.py
return response
@@ -3047,7 +3136,18 @@ class GatewayRunner:
agent_holder = [None] # Mutable container for the agent instance
result_holder = [None] # Mutable container for the result
tools_holder = [None] # Mutable container for the tool definitions
# ── Streaming setup ─────────────────────────────────────────────
_stream_q = None
_stream_done = None
_stream_msg_id = [None]
_platform_key = source.platform.value if source.platform else ""
_streaming_enabled = self._is_streaming_enabled(_platform_key)
if _streaming_enabled:
_stream_q = queue.Queue()
_stream_done = threading.Event()
# Bridge sync step_callback → async hooks.emit for agent:step events
_loop_for_step = asyncio.get_event_loop()
_hooks_ref = self.hooks
@@ -3120,6 +3220,19 @@ class GatewayRunner:
}
pr = self._provider_routing
# Streaming: build callback that feeds the async queue
_on_stream_token = None
if _stream_q is not None:
_sq = _stream_q # capture for closure
_sd = _stream_done
def _on_stream_token(delta):
if delta is None:
if _sd:
_sd.set()
else:
_sq.put(delta)
agent = AIAgent(
model=model,
**runtime_kwargs,
@@ -3137,6 +3250,7 @@ class GatewayRunner:
provider_require_parameters=pr.get("require_parameters", False),
provider_data_collection=pr.get("data_collection"),
session_id=session_id,
stream_callback=_on_stream_token,
tool_progress_callback=progress_callback if tool_progress_enabled else None,
step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None,
platform=platform_key,
@@ -3263,7 +3377,7 @@ class GatewayRunner:
unique_tags.insert(0, "[[audio_as_voice]]")
final_response = final_response + "\n" + "\n".join(unique_tags)
return {
_result_dict = {
"final_response": final_response,
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
"api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0,
@@ -3271,12 +3385,86 @@ class GatewayRunner:
"history_offset": len(agent_history),
"last_prompt_tokens": _last_prompt_toks,
}
if _stream_msg_id[0]:
_result_dict["_streamed_msg_id"] = _stream_msg_id[0]
return _result_dict
# Start progress message sender if enabled
progress_task = None
if tool_progress_enabled:
progress_task = asyncio.create_task(send_progress_messages())
# ── Stream preview: progressively edit a message with streaming tokens ──
async def stream_preview():
if not _stream_q or not _stream_done:
return
adapter = self.adapters.get(source.platform)
if not adapter:
return
accumulated = []
token_count = 0
last_edit = 0.0
MIN_TOKENS = 20
EDIT_INTERVAL = 1.5
_metadata = {"thread_id": source.thread_id} if source.thread_id else None
try:
while not _stream_done.is_set():
try:
chunk = _stream_q.get(timeout=0.1)
accumulated.append(chunk)
token_count += 1
except Exception:
continue
now = time.monotonic()
if token_count >= MIN_TOKENS and (now - last_edit) >= EDIT_INTERVAL:
preview = "".join(accumulated) + ""
if _stream_msg_id[0] is None:
r = await adapter.send(
chat_id=source.chat_id, content=preview,
metadata=_metadata,
)
if r.success and r.message_id:
_stream_msg_id[0] = r.message_id
else:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=_stream_msg_id[0],
content=preview,
)
last_edit = now
# Drain remaining
while not _stream_q.empty():
try:
accumulated.append(_stream_q.get_nowait())
except Exception:
break
# Final edit: remove cursor
if _stream_msg_id[0] and accumulated:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=_stream_msg_id[0],
content="".join(accumulated),
)
except asyncio.CancelledError:
if _stream_msg_id[0] and accumulated:
try:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=_stream_msg_id[0],
content="".join(accumulated),
)
except Exception:
pass
except Exception as e:
logger.debug("stream_preview error: %s", e)
stream_task = asyncio.create_task(stream_preview()) if _stream_q else None
# Track this agent as running for this session (for interrupt support)
# We do this in a callback after the agent is created
async def track_agent():
@@ -3351,9 +3539,11 @@ class GatewayRunner:
session_key=session_key
)
finally:
# Stop progress sender and interrupt monitor
# Stop progress sender, stream preview, and interrupt monitor
if progress_task:
progress_task.cancel()
if stream_task:
stream_task.cancel()
interrupt_monitor.cancel()
# Clean up tracking
@@ -3362,7 +3552,7 @@ class GatewayRunner:
del self._running_agents[session_key]
# Wait for cancelled tasks
for task in [progress_task, interrupt_monitor, tracking_task]:
for task in [progress_task, stream_task, interrupt_monitor, tracking_task]:
if task:
try:
await task

View File

@@ -494,6 +494,38 @@ OPTIONAL_ENV_VARS = {
"advanced": True,
},
# ── API Server ──
"API_SERVER_ENABLED": {
"description": "Enable the OpenAI-compatible API server (true/false). Allows frontends like Open WebUI to connect.",
"prompt": "Enable API server (true/false)",
"url": None,
"password": False,
"category": "messaging",
},
"API_SERVER_KEY": {
"description": "Bearer token for API server authentication. If not set, all requests are allowed (local-only use).",
"prompt": "API server auth key (leave empty for no auth)",
"url": None,
"password": True,
"category": "messaging",
},
"API_SERVER_PORT": {
"description": "Port for the API server (default: 8642).",
"prompt": "API server port",
"url": None,
"password": False,
"category": "messaging",
"advanced": True,
},
"API_SERVER_HOST": {
"description": "Bind address for the API server (default: 127.0.0.1). Use 0.0.0.0 for network access (set API_SERVER_KEY!).",
"prompt": "API server bind address",
"url": None,
"password": False,
"category": "messaging",
"advanced": True,
},
# ── Agent settings ──
"MESSAGING_CWD": {
"description": "Working directory for terminal commands via messaging",

View File

@@ -176,6 +176,7 @@ class AIAgent:
reasoning_callback: callable = None,
clarify_callback: callable = None,
step_callback: callable = None,
stream_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
@@ -229,6 +230,9 @@ class AIAgent:
polluting trajectories with user-specific persona or project instructions.
honcho_session_key (str): Session key for Honcho integration (e.g., "telegram:123456" or CLI session_id).
When provided and Honcho is enabled in config, enables persistent cross-session user modeling.
stream_callback (callable): Optional callback(text_delta: str) invoked for each
text token during streaming LLM generation. Pass None (end signal) when done.
When set, the agent uses stream=True for API calls. Disabled by default.
"""
self.model = model
self.max_iterations = max_iterations
@@ -264,6 +268,7 @@ class AIAgent:
self.reasoning_callback = reasoning_callback
self.clarify_callback = clarify_callback
self.step_callback = step_callback
self.stream_callback = stream_callback
self._last_reported_tool = None # Track for "new tool" mode
# Interrupt mechanism for breaking out of tool loops
@@ -2010,8 +2015,20 @@ class AIAgent:
for attempt in range(max_stream_retries + 1):
try:
with self.client.responses.stream(**api_kwargs) as stream:
for _ in stream:
pass
for event in stream:
if self.stream_callback and hasattr(event, 'type'):
if getattr(event, 'type', '') == 'response.output_text.delta':
delta_text = getattr(event, 'delta', '')
if delta_text:
try:
self.stream_callback(delta_text)
except Exception:
pass
if self.stream_callback:
try:
self.stream_callback(None)
except Exception:
pass
return stream.get_final_response()
except RuntimeError as exc:
err_text = str(exc)
@@ -2149,6 +2166,87 @@ class AIAgent:
return True
def _run_streaming_chat_completion(self, api_kwargs: dict):
"""Stream a chat completion, emitting text tokens via stream_callback.
Returns a SimpleNamespace response object compatible with the non-streaming
code path. Falls back to non-streaming on any error.
"""
stream_kwargs = dict(api_kwargs)
stream_kwargs["stream"] = True
# Request usage in the final chunk
stream_kwargs["stream_options"] = {"include_usage": True}
accumulated_content = []
accumulated_tool_calls = {}
final_usage = None
try:
stream = self.client.chat.completions.create(**stream_kwargs)
for chunk in stream:
if not chunk.choices:
if hasattr(chunk, 'usage') and chunk.usage:
final_usage = chunk.usage
continue
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
accumulated_content.append(delta.content)
if self.stream_callback:
try:
self.stream_callback(delta.content)
except Exception:
pass
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tc_delta in delta.tool_calls:
idx = tc_delta.index
if idx not in accumulated_tool_calls:
accumulated_tool_calls[idx] = {"id": tc_delta.id or "", "name": "", "arguments": ""}
if hasattr(tc_delta, 'function') and tc_delta.function:
if getattr(tc_delta.function, 'name', None):
accumulated_tool_calls[idx]["name"] = tc_delta.function.name
if getattr(tc_delta.function, 'arguments', None):
accumulated_tool_calls[idx]["arguments"] += tc_delta.function.arguments
if self.stream_callback:
try:
self.stream_callback(None) # End signal
except Exception:
pass
tool_calls = []
for idx in sorted(accumulated_tool_calls):
tc = accumulated_tool_calls[idx]
if tc["name"]:
tool_calls.append(SimpleNamespace(
id=tc["id"], type="function",
function=SimpleNamespace(name=tc["name"], arguments=tc["arguments"]),
))
return SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(
content="".join(accumulated_content) or "",
tool_calls=tool_calls or None,
role="assistant",
),
finish_reason="tool_calls" if tool_calls else "stop",
)],
usage=final_usage,
model=self.model,
)
except Exception as e:
if self.stream_callback:
try:
self.stream_callback(None)
except Exception:
pass
logger.debug("Streaming chat completion failed, falling back: %s", e)
return self.client.chat.completions.create(**api_kwargs)
def _interruptible_api_call(self, api_kwargs: dict):
"""
Run the API call in a background thread so the main conversation loop
@@ -2164,6 +2262,8 @@ class AIAgent:
try:
if self.api_mode == "codex_responses":
result["response"] = self._run_codex_stream(api_kwargs)
elif self.stream_callback is not None:
result["response"] = self._run_streaming_chat_completion(api_kwargs)
else:
result["response"] = self.client.chat.completions.create(**api_kwargs)
except Exception as e:

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,210 @@
"""Tests for Telegram reply_to_mode functionality.
Covers the threading behavior control for multi-chunk replies:
- "off": Never thread replies to original message
- "first": Only first chunk threads (default)
- "all": All chunks thread to original message
"""
import sys
from unittest.mock import MagicMock, AsyncMock, patch
import pytest
from gateway.config import PlatformConfig
def _ensure_telegram_mock():
"""Mock the telegram package if it's not installed."""
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
mod = MagicMock()
mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
mod.constants.ChatType.GROUP = "group"
mod.constants.ChatType.SUPERGROUP = "supergroup"
mod.constants.ChatType.CHANNEL = "channel"
mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
sys.modules.setdefault(name, mod)
_ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
@pytest.fixture()
def adapter_factory():
"""Factory to create TelegramAdapter with custom reply_to_mode."""
def create(reply_to_mode: str = "first"):
config = PlatformConfig(enabled=True, token="test-token", reply_to_mode=reply_to_mode)
return TelegramAdapter(config)
return create
class TestReplyToModeConfig:
"""Tests for reply_to_mode configuration loading."""
def test_default_mode_is_first(self, adapter_factory):
adapter = adapter_factory()
assert adapter._reply_to_mode == "first"
def test_off_mode(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="off")
assert adapter._reply_to_mode == "off"
def test_first_mode(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="first")
assert adapter._reply_to_mode == "first"
def test_all_mode(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="all")
assert adapter._reply_to_mode == "all"
def test_invalid_mode_stored_as_is(self, adapter_factory):
"""Invalid modes are stored but _should_thread_reply handles them."""
adapter = adapter_factory(reply_to_mode="invalid")
assert adapter._reply_to_mode == "invalid"
def test_none_mode_defaults_to_first(self):
config = PlatformConfig(enabled=True, token="test-token")
adapter = TelegramAdapter(config)
assert adapter._reply_to_mode == "first"
def test_empty_string_mode_defaults_to_first(self):
config = PlatformConfig(enabled=True, token="test-token", reply_to_mode="")
adapter = TelegramAdapter(config)
assert adapter._reply_to_mode == "first"
class TestShouldThreadReply:
"""Tests for _should_thread_reply method."""
def test_no_reply_to_returns_false(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="first")
assert adapter._should_thread_reply("12345", None, 0) is False
assert adapter._should_thread_reply("12345", "", 0) is False
def test_off_mode_never_threads(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="off")
assert adapter._should_thread_reply("12345", "123", 0) is False
assert adapter._should_thread_reply("12345", "123", 1) is False
assert adapter._should_thread_reply("12345", "123", 5) is False
def test_first_mode_only_first_chunk(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="first")
assert adapter._should_thread_reply("12345", "123", 0) is True
assert adapter._should_thread_reply("12345", "123", 1) is False
assert adapter._should_thread_reply("12345", "123", 2) is False
assert adapter._should_thread_reply("12345", "123", 10) is False
def test_all_mode_all_chunks(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="all")
assert adapter._should_thread_reply("12345", "123", 0) is True
assert adapter._should_thread_reply("12345", "123", 1) is True
assert adapter._should_thread_reply("12345", "123", 2) is True
assert adapter._should_thread_reply("12345", "123", 10) is True
def test_invalid_mode_falls_back_to_first(self, adapter_factory):
"""Invalid mode behaves like 'first' - only first chunk threads."""
adapter = adapter_factory(reply_to_mode="invalid")
assert adapter._should_thread_reply("12345", "123", 0) is True
assert adapter._should_thread_reply("12345", "123", 1) is False
class TestSendWithReplyToMode:
"""Tests for send() method respecting reply_to_mode."""
@pytest.mark.asyncio
async def test_off_mode_no_reply_threading(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="off")
adapter._bot = MagicMock()
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
# Mock truncate to return multiple chunks
adapter.truncate_message = lambda content, max_len: ["chunk1", "chunk2", "chunk3"]
await adapter.send("12345", "test content", reply_to="999")
# Verify none of the calls had reply_to_message_id
for call in adapter._bot.send_message.call_args_list:
assert call.kwargs.get("reply_to_message_id") is None
@pytest.mark.asyncio
async def test_first_mode_only_first_chunk_threads(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="first")
adapter._bot = MagicMock()
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
adapter.truncate_message = lambda content, max_len: ["chunk1", "chunk2", "chunk3"]
await adapter.send("12345", "test content", reply_to="999")
calls = adapter._bot.send_message.call_args_list
assert len(calls) == 3
# First chunk should have reply_to_message_id
assert calls[0].kwargs.get("reply_to_message_id") == 999
# Remaining chunks should not
assert calls[1].kwargs.get("reply_to_message_id") is None
assert calls[2].kwargs.get("reply_to_message_id") is None
@pytest.mark.asyncio
async def test_all_mode_all_chunks_thread(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="all")
adapter._bot = MagicMock()
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
adapter.truncate_message = lambda content, max_len: ["chunk1", "chunk2", "chunk3"]
await adapter.send("12345", "test content", reply_to="999")
calls = adapter._bot.send_message.call_args_list
assert len(calls) == 3
# All chunks should have reply_to_message_id
for call in calls:
assert call.kwargs.get("reply_to_message_id") == 999
@pytest.mark.asyncio
async def test_no_reply_to_param_no_threading(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="all")
adapter._bot = MagicMock()
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
adapter.truncate_message = lambda content, max_len: ["chunk1", "chunk2"]
await adapter.send("12345", "test content", reply_to=None)
calls = adapter._bot.send_message.call_args_list
for call in calls:
assert call.kwargs.get("reply_to_message_id") is None
@pytest.mark.asyncio
async def test_single_chunk_respects_mode(self, adapter_factory):
adapter = adapter_factory(reply_to_mode="first")
adapter._bot = MagicMock()
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
adapter.truncate_message = lambda content, max_len: ["single chunk"]
await adapter.send("12345", "test", reply_to="999")
calls = adapter._bot.send_message.call_args_list
assert len(calls) == 1
assert calls[0].kwargs.get("reply_to_message_id") == 999
class TestConfigSerialization:
"""Tests for reply_to_mode serialization."""
def test_to_dict_includes_reply_to_mode(self):
config = PlatformConfig(enabled=True, token="test", reply_to_mode="all")
result = config.to_dict()
assert result["reply_to_mode"] == "all"
def test_from_dict_loads_reply_to_mode(self):
data = {"enabled": True, "token": "test", "reply_to_mode": "off"}
config = PlatformConfig.from_dict(data)
assert config.reply_to_mode == "off"
def test_from_dict_defaults_to_first(self):
data = {"enabled": True, "token": "test"}
config = PlatformConfig.from_dict(data)
assert config.reply_to_mode == "first"

335
tests/test_streaming.py Normal file
View File

@@ -0,0 +1,335 @@
"""Unit tests for streaming support.
Tests cover:
- _run_streaming_chat_completion: text tokens, tool calls, fallback on error,
no callback, end signal
- _interruptible_api_call routing to streaming when stream_callback is set
- Streaming config reading from config.yaml
"""
import json
import threading
from types import SimpleNamespace
from unittest.mock import MagicMock, patch, PropertyMock
import pytest
from run_agent import AIAgent
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_tool_defs(*names: str) -> list:
"""Build minimal tool definition list accepted by AIAgent.__init__."""
return [
{
"type": "function",
"function": {
"name": n,
"description": f"{n} tool",
"parameters": {"type": "object", "properties": {}},
},
}
for n in names
]
@pytest.fixture()
def agent():
"""Minimal AIAgent with mocked client, no stream_callback."""
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
a.client = MagicMock()
return a
@pytest.fixture()
def streaming_agent():
"""Agent with a stream_callback set."""
collected = []
def _cb(delta):
collected.append(delta)
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
stream_callback=_cb,
)
a.client = MagicMock()
a._collected_tokens = collected
return a
# ---------------------------------------------------------------------------
# Helpers — build fake streaming chunks
# ---------------------------------------------------------------------------
def _make_text_chunks(*texts):
"""Return a list of SimpleNamespace chunks containing text deltas."""
chunks = []
for t in texts:
chunks.append(SimpleNamespace(
choices=[SimpleNamespace(
delta=SimpleNamespace(content=t, tool_calls=None),
finish_reason=None,
)],
usage=None,
))
# Final chunk with usage info
chunks.append(SimpleNamespace(
choices=[],
usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5, total_tokens=15),
))
return chunks
def _make_tool_call_chunks():
"""Return chunks that simulate a tool call response."""
chunks = [
# First chunk: tool call id + name
SimpleNamespace(
choices=[SimpleNamespace(
delta=SimpleNamespace(
content=None,
tool_calls=[SimpleNamespace(
index=0,
id="call_123",
function=SimpleNamespace(name="web_search", arguments=""),
)],
),
finish_reason=None,
)],
usage=None,
),
# Second chunk: tool call arguments
SimpleNamespace(
choices=[SimpleNamespace(
delta=SimpleNamespace(
content=None,
tool_calls=[SimpleNamespace(
index=0,
id=None,
function=SimpleNamespace(name=None, arguments='{"query": "test"}'),
)],
),
finish_reason=None,
)],
usage=None,
),
# Final usage chunk
SimpleNamespace(choices=[], usage=SimpleNamespace(
prompt_tokens=20, completion_tokens=10, total_tokens=30,
)),
]
return chunks
# ---------------------------------------------------------------------------
# Tests: _run_streaming_chat_completion
# ---------------------------------------------------------------------------
class TestRunStreamingChatCompletion:
"""Tests for AIAgent._run_streaming_chat_completion."""
def test_text_tokens_streamed_via_callback(self, streaming_agent):
"""Text deltas are forwarded to stream_callback and accumulated."""
chunks = _make_text_chunks("Hello", " ", "world")
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
assert result.choices[0].message.content == "Hello world"
# Callback received each token + None end signal
assert streaming_agent._collected_tokens == ["Hello", " ", "world", None]
def test_tool_calls_accumulated(self, streaming_agent):
"""Tool call deltas are aggregated into a proper tool_calls list."""
chunks = _make_tool_call_chunks()
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
assert result.choices[0].message.tool_calls is not None
tc = result.choices[0].message.tool_calls[0]
assert tc.function.name == "web_search"
assert '"query"' in tc.function.arguments
def test_fallback_on_streaming_error(self, streaming_agent):
"""Falls back to non-streaming on error."""
# First call (streaming) raises; second call (fallback) succeeds
fallback_response = SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(content="fallback", tool_calls=None, role="assistant"),
finish_reason="stop",
)],
usage=SimpleNamespace(prompt_tokens=5, completion_tokens=3, total_tokens=8),
model="test",
)
call_count = [0]
def _side_effect(**kwargs):
call_count[0] += 1
if kwargs.get("stream"):
raise ConnectionError("stream broke")
return fallback_response
streaming_agent.client.chat.completions.create.side_effect = _side_effect
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
assert result.choices[0].message.content == "fallback"
assert call_count[0] == 2 # streaming attempt + fallback
# Callback should still get None (end signal) even on error
assert None in streaming_agent._collected_tokens
def test_no_callback_still_works(self, agent):
"""Streaming works even without a callback (just accumulates)."""
chunks = _make_text_chunks("ok")
agent.client.chat.completions.create.return_value = iter(chunks)
result = agent._run_streaming_chat_completion({"model": "test"})
assert result.choices[0].message.content == "ok"
def test_end_signal_sent(self, streaming_agent):
"""stream_callback(None) is sent after all tokens."""
chunks = _make_text_chunks("done")
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
streaming_agent._run_streaming_chat_completion({"model": "test"})
assert streaming_agent._collected_tokens[-1] is None
def test_usage_captured_from_final_chunk(self, streaming_agent):
"""Usage stats from the final usage-only chunk are returned."""
chunks = _make_text_chunks("hi")
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
assert result.usage is not None
assert result.usage.prompt_tokens == 10
assert result.usage.completion_tokens == 5
# ---------------------------------------------------------------------------
# Tests: _interruptible_api_call routing
# ---------------------------------------------------------------------------
class TestInterruptibleApiCallRouting:
"""Tests that _interruptible_api_call routes to streaming when callback is set."""
def test_routes_to_streaming_with_callback(self, streaming_agent):
"""When stream_callback is set, _interruptible_api_call uses streaming."""
chunks = _make_text_chunks("streamed")
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
# Mock _interrupt_requested to False
streaming_agent._interrupt_requested = False
result = streaming_agent._interruptible_api_call({"model": "test"})
assert result.choices[0].message.content == "streamed"
# Verify the callback got tokens
assert "streamed" in streaming_agent._collected_tokens
def test_routes_to_normal_without_callback(self, agent):
"""When no stream_callback, _interruptible_api_call uses normal completion."""
normal_response = SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(content="normal", tool_calls=None, role="assistant"),
finish_reason="stop",
)],
usage=SimpleNamespace(prompt_tokens=5, completion_tokens=3, total_tokens=8),
model="test",
)
agent.client.chat.completions.create.return_value = normal_response
agent._interrupt_requested = False
result = agent._interruptible_api_call({"model": "test"})
assert result.choices[0].message.content == "normal"
# ---------------------------------------------------------------------------
# Tests: Streaming config
# ---------------------------------------------------------------------------
class TestStreamingConfig:
"""Tests for reading streaming configuration."""
def test_streaming_disabled_by_default(self):
"""Without any config, streaming is disabled."""
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert a.stream_callback is None
def test_stream_callback_stored_on_agent(self):
"""stream_callback passed to constructor is stored on the agent."""
cb = lambda delta: None
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
stream_callback=cb,
)
assert a.stream_callback is cb
def test_gateway_streaming_config_structure(self):
"""Verify the expected streaming config structure from gateway/run.py."""
# This tests that _read_streaming_config (if it exists) returns
# the right structure. We mock the config file content.
try:
from gateway.run import _read_streaming_config
except ImportError:
pytest.skip("gateway.run._read_streaming_config not available")
mock_cfg = {
"streaming": {
"enabled": True,
"telegram": True,
"discord": False,
"edit_interval": 2.0,
}
}
with patch("builtins.open", MagicMock()):
with patch("yaml.safe_load", return_value=mock_cfg):
try:
result = _read_streaming_config()
assert result.get("enabled") is True
except Exception:
# Function might not exist yet or have different signature
pytest.skip("_read_streaming_config has different interface")

View File

@@ -0,0 +1,223 @@
---
sidebar_position: 14
title: "API Server"
description: "Expose hermes-agent as an OpenAI-compatible API for any frontend"
---
# API Server
The API server exposes hermes-agent as an OpenAI-compatible HTTP endpoint. Any frontend that speaks the OpenAI format — Open WebUI, LobeChat, LibreChat, NextChat, ChatBox, and hundreds more — can connect to hermes-agent and use it as a backend.
Your agent handles requests with its full toolset (terminal, file operations, web search, memory, skills) and returns the final response. Tool calls execute invisibly server-side.
## Quick Start
### 1. Enable the API server
Add to `~/.hermes/.env`:
```bash
API_SERVER_ENABLED=true
```
### 2. Start the gateway
```bash
hermes gateway
```
You'll see:
```
[API Server] API server listening on http://127.0.0.1:8642
```
### 3. Connect a frontend
Point any OpenAI-compatible client at `http://localhost:8642/v1`:
```bash
# Test with curl
curl http://localhost:8642/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model": "hermes-agent", "messages": [{"role": "user", "content": "Hello!"}]}'
```
Or connect Open WebUI, LobeChat, or any other frontend — see the [Open WebUI integration guide](/docs/user-guide/messaging/open-webui) for step-by-step instructions.
## Endpoints
### POST /v1/chat/completions
Standard OpenAI Chat Completions format. Stateless — the full conversation is included in each request via the `messages` array.
**Request:**
```json
{
"model": "hermes-agent",
"messages": [
{"role": "system", "content": "You are a Python expert."},
{"role": "user", "content": "Write a fibonacci function"}
],
"stream": false
}
```
**Response:**
```json
{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1710000000,
"model": "hermes-agent",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "Here's a fibonacci function..."},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 50, "completion_tokens": 200, "total_tokens": 250}
}
```
**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. When streaming is enabled in config, tokens are emitted live as the LLM generates them. When disabled, the full response is sent as a single SSE chunk.
### POST /v1/responses
OpenAI Responses API format. Supports server-side conversation state via `previous_response_id` — the server stores full conversation history (including tool calls and results) so multi-turn context is preserved without the client managing it.
**Request:**
```json
{
"model": "hermes-agent",
"input": "What files are in my project?",
"instructions": "You are a helpful coding assistant.",
"store": true
}
```
**Response:**
```json
{
"id": "resp_abc123",
"object": "response",
"status": "completed",
"model": "hermes-agent",
"output": [
{"type": "function_call", "name": "terminal", "arguments": "{\"command\": \"ls\"}", "call_id": "call_1"},
{"type": "function_call_output", "call_id": "call_1", "output": "README.md src/ tests/"},
{"type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "Your project has..."}]}
],
"usage": {"input_tokens": 50, "output_tokens": 200, "total_tokens": 250}
}
```
#### Multi-turn with previous_response_id
Chain responses to maintain full context (including tool calls) across turns:
```json
{
"input": "Now show me the README",
"previous_response_id": "resp_abc123"
}
```
The server reconstructs the full conversation from the stored response chain — all previous tool calls and results are preserved.
#### Named conversations
Use the `conversation` parameter instead of tracking response IDs:
```json
{"input": "Hello", "conversation": "my-project"}
{"input": "What's in src/?", "conversation": "my-project"}
{"input": "Run the tests", "conversation": "my-project"}
```
The server automatically chains to the latest response in that conversation. Like the `/title` command for gateway sessions.
### GET /v1/responses/{id}
Retrieve a previously stored response by ID.
### DELETE /v1/responses/{id}
Delete a stored response.
### GET /v1/models
Lists `hermes-agent` as an available model. Required by most frontends for model discovery.
### GET /health
Health check. Returns `{"status": "ok"}`.
## System Prompt Handling
When a frontend sends a `system` message (Chat Completions) or `instructions` field (Responses API), hermes-agent **layers it on top** of its core system prompt. Your agent keeps all its tools, memory, and skills — the frontend's system prompt adds extra instructions.
This means you can customize behavior per-frontend without losing capabilities:
- Open WebUI system prompt: "You are a Python expert. Always include type hints."
- The agent still has terminal, file tools, web search, memory, etc.
## Authentication
Bearer token auth via the `Authorization` header:
```
Authorization: Bearer ***
```
Configure the key via `API_SERVER_KEY` env var. If no key is set, all requests are allowed (for local-only use).
:::warning Security
The API server gives full access to hermes-agent's toolset, **including terminal commands**. If you change the bind address to `0.0.0.0` (network-accessible), **always set `API_SERVER_KEY`** — without it, anyone on your network can execute arbitrary commands on your machine.
The default bind address (`127.0.0.1`) is safe for local-only use.
:::
## Configuration
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `API_SERVER_ENABLED` | `false` | Enable the API server |
| `API_SERVER_PORT` | `8642` | HTTP server port |
| `API_SERVER_HOST` | `127.0.0.1` | Bind address (localhost only by default) |
| `API_SERVER_KEY` | _(none)_ | Bearer token for auth |
### config.yaml
```yaml
# Not yet supported — use environment variables.
# config.yaml support coming in a future release.
```
## CORS
The API server includes CORS headers on all responses (`Access-Control-Allow-Origin: *`), so browser-based frontends can connect directly.
## Compatible Frontends
Any frontend that supports the OpenAI API format works. Tested/documented integrations:
| Frontend | Stars | Connection |
|----------|-------|------------|
| [Open WebUI](/docs/user-guide/messaging/open-webui) | 126k | Full guide available |
| LobeChat | 73k | Custom provider endpoint |
| LibreChat | 34k | Custom endpoint in librechat.yaml |
| AnythingLLM | 56k | Generic OpenAI provider |
| NextChat | 87k | BASE_URL env var |
| ChatBox | 39k | API Host setting |
| Jan | 26k | Remote model config |
| HF Chat-UI | 8k | OPENAI_BASE_URL |
| big-AGI | 7k | Custom endpoint |
| OpenAI Python SDK | — | `OpenAI(base_url="http://localhost:8642/v1")` |
| curl | — | Direct HTTP requests |
## Limitations
- **Response storage is in-memory** — stored responses (for `previous_response_id`) are lost on gateway restart. Max 100 stored responses (LRU eviction).
- **No file upload** — vision/document analysis via uploaded files is not yet supported through the API.
- **Model field is cosmetic** — the `model` field in requests is accepted but the actual LLM model used is configured server-side in config.yaml.

View File

@@ -0,0 +1,210 @@
---
sidebar_position: 15
title: "Streaming"
description: "Token-by-token live response display across all platforms"
---
# Streaming Responses
When enabled, hermes-agent streams LLM responses token-by-token instead of waiting for the full generation. Users see the response typing out live — the same experience as ChatGPT, Claude, or Gemini.
Streaming is **disabled by default** and can be enabled globally or per-platform.
## How It Works
```
LLM generates tokens → callback fires per token → queue → consumer displays
Telegram/Discord/Slack:
Token arrives → Accumulate → Every 1.5s, edit the message with new text + ▌ cursor
Done → Final edit removes cursor
API Server:
Token arrives → SSE event sent to client immediately
Done → finish chunk + [DONE]
```
The agent's internal operation doesn't change — tools still execute normally, memory and skills work as before. Streaming only affects how the **final text response** is delivered to the user.
## Enable Streaming
### Option 1: Environment variable
```bash
# Enable for all platforms
export HERMES_STREAMING_ENABLED=true
hermes gateway
```
### Option 2: config.yaml
```yaml
streaming:
enabled: true # Master switch
```
### Option 3: Per-platform
```yaml
streaming:
enabled: false # Off by default
telegram: true # But on for Telegram
discord: true # And Discord
api_server: true # And the API server
```
## Platform Support
| Platform | Streaming Method | Rate Limit | Notes |
|----------|-----------------|------------|-------|
| **Telegram** | Progressive message editing | ~20 edits/min | 1.5s edit interval, ▌ cursor |
| **Discord** | Progressive message editing | 5 edits/5s | 1.5s edit interval |
| **Slack** | Progressive message editing | ~50 calls/min | 1.5s edit interval |
| **API Server** | SSE (Server-Sent Events) | No limit | Real token-by-token events |
| **WhatsApp** | ❌ Not supported | — | No message editing API |
| **Home Assistant** | ❌ Not supported | — | No message editing API |
| **CLI** | ❌ Not yet implemented | — | KawaiiSpinner provides feedback |
Platforms without message editing support automatically fall back to non-streaming (the response appears all at once, as before).
## What Users See
### Telegram/Discord/Slack
1. Agent starts working (typing indicator shows)
2. After ~20 tokens, a message appears with partial text and a ▌ cursor
3. Every 1.5 seconds, the message is edited with more accumulated text
4. When the response is complete, the cursor disappears
Tool progress messages still work alongside streaming — tool names/previews appear as before, and the streamed response is shown in a separate message.
### API Server (frontends like Open WebUI)
When `stream: true` is set in the request, the API server returns Server-Sent Events:
```
data: {"choices":[{"delta":{"role":"assistant"}}]}
data: {"choices":[{"delta":{"content":"Here"}}]}
data: {"choices":[{"delta":{"content":" is"}}]}
data: {"choices":[{"delta":{"content":" the"}}]}
data: {"choices":[{"delta":{"content":" answer"}}]}
data: {"choices":[{"delta":{},"finish_reason":"stop"}]}
data: [DONE]
```
Frontends like Open WebUI display this as live typing.
## How It Works Internally
### Architecture
```
┌─────────────┐ stream_callback(delta) ┌──────────────────┐
│ LLM API │ ──────────────────────────► │ queue.Queue() │
│ (stream) │ (runs in agent thread) │ (thread-safe) │
└─────────────┘ └────────┬─────────┘
┌──────────────┼──────────┐
│ │ │
┌─────▼─────┐ ┌─────▼────┐ ┌──▼──────┐
│ Gateway │ │ API Svr │ │ CLI │
│ edit msg │ │ SSE evt │ │ (TODO) │
└───────────┘ └──────────┘ └─────────┘
```
1. `AIAgent.__init__` accepts an optional `stream_callback` function
2. When set, `_interruptible_api_call()` routes to `_run_streaming_chat_completion()` instead of the normal non-streaming path
3. The streaming method calls the OpenAI API with `stream=True`, iterates chunks, and calls `stream_callback(delta_text)` for each text token
4. Tool call deltas are accumulated silently (no streaming for tool arguments)
5. When the stream ends, `stream_callback(None)` signals completion
6. The method returns a fake response object compatible with the existing code path
7. If streaming fails for any reason, it falls back to a normal non-streaming API call
### Thread Safety
The agent runs in a background thread (via `_interruptible_api_call`). The consumer (gateway async task, API server SSE writer) runs in the main event loop. A `queue.Queue` bridges them — it's thread-safe by design.
### Graceful Fallback
If the LLM provider doesn't support `stream=True` or the streaming connection fails, the agent automatically falls back to a non-streaming API call. The user gets the response normally, just without the live typing effect. No error is shown.
## Configuration Reference
```yaml
streaming:
enabled: false # Master switch (default: off)
# Per-platform overrides (optional):
telegram: true # Enable for Telegram
discord: true # Enable for Discord
slack: true # Enable for Slack
api_server: true # Enable for API server
# Tuning (optional):
edit_interval: 1.5 # Seconds between message edits (default: 1.5)
min_tokens: 20 # Tokens before first display (default: 20)
```
| Variable | Default | Description |
|----------|---------|-------------|
| `HERMES_STREAMING_ENABLED` | `false` | Master switch via env var |
| `streaming.enabled` | `false` | Master switch via config |
| `streaming.<platform>` | _(unset)_ | Per-platform override |
| `streaming.edit_interval` | `1.5` | Seconds between Telegram/Discord edits |
| `streaming.min_tokens` | `20` | Minimum tokens before first message |
## Interaction with Other Features
### Tool Execution
When the agent calls tools (terminal, file operations, web search, etc.), no text tokens are generated — tool arguments are accumulated silently. Tool progress messages continue to work as before. After tools finish, the next LLM call may produce the final text response, which streams normally.
### Context Compression
Compression happens between API calls, not during streaming. No interaction.
### Interrupts
If the user sends a new message while streaming, the agent is interrupted. The HTTP connection is closed (stopping token generation), accumulated text is shown as-is, and the new message is processed.
### Prompt Caching
Streaming doesn't affect prompt caching — the request is identical, just with `stream=True` added.
### Responses API (Codex)
The Codex/Responses API streaming path also supports the `stream_callback`. Token deltas from `response.output_text.delta` events are emitted via the callback.
## Troubleshooting
### Streaming isn't working
1. Check the config: `streaming.enabled: true` in config.yaml or `HERMES_STREAMING_ENABLED=true`
2. Check per-platform: `streaming.telegram: true` overrides the master switch
3. Restart the gateway after changing config
4. Check logs for "Streaming failed, falling back" — indicates the provider may not support streaming
### Response appears twice
If you see the response both in a progressively-edited message AND as a separate final message, this is a bug. The streaming system should suppress the normal send when tokens were delivered via streaming. Please file an issue.
### Messages update too slowly
The default edit interval is 1.5 seconds (to respect platform rate limits). You can lower it in config:
```yaml
streaming:
edit_interval: 1.0 # Faster updates (may hit rate limits)
```
Going below 1.0s risks Telegram rate limiting (429 errors).
### No streaming on WhatsApp/HomeAssistant
These platforms don't support message editing, so streaming automatically falls back to non-streaming. This is expected behavior.

View File

@@ -1,12 +1,12 @@
---
sidebar_position: 1
title: "Messaging Gateway"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, or Email — architecture and setup overview"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, Email, or any OpenAI-compatible frontend — architecture and setup overview"
---
# Messaging Gateway
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, or Email. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, Email, or any OpenAI-compatible frontend (Open WebUI, LobeChat, etc.). The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
## Architecture
@@ -15,12 +15,12 @@ Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, or Email. The
│ Hermes Gateway │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ┌────────┐ ┌───────┐│
│ │ Telegram │ │ Discord │ │ WhatsApp │ │ Slack │ │ Signal │ │ Email ││
│ │ Adapter │ │ Adapter │ │ Adapter │ │Adapter │ │Adapter │ │Adapter││
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘ └───┬────┘ └──┬────┘│
│ │ │ │ │ │ │ │
│ └─────────────┼────────────┼────────────┼──────────┼─────────
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ┌────────┐ ┌───────┐ ┌──────────┐
│ │ Telegram │ │ Discord │ │ WhatsApp │ │ Slack │ │ Signal │ │ Email │ │API Server│
│ │ Adapter │ │ Adapter │ │ Adapter │ │Adapter │ │Adapter │ │Adapter│ │ (OpenAI) │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘ └───┬────┘ └──┬────┘ └────┬─────┘
│ │ │ │ │ │ │
│ └─────────────┼────────────┼────────────┼──────────┼─────────┼───────────┘
│ │ │
│ ┌────────▼────────┐ │
│ │ Session Store │ │
@@ -204,6 +204,7 @@ Each platform has its own toolset:
| Slack | `hermes-slack` | Full tools including terminal |
| Signal | `hermes-signal` | Full tools including terminal |
| Email | `hermes-email` | Full tools including terminal |
| API Server | `hermes-api_server` | Full tools including terminal |
## Next Steps
@@ -213,3 +214,5 @@ Each platform has its own toolset:
- [WhatsApp Setup](whatsapp.md)
- [Signal Setup](signal.md)
- [Email Setup](email.md)
- [Open WebUI Setup](open-webui.md)
- [API Server](../features/api-server.md)

View File

@@ -0,0 +1,213 @@
---
sidebar_position: 8
title: "Open WebUI"
description: "Connect Open WebUI to Hermes Agent via the OpenAI-compatible API server"
---
# Open WebUI Integration
[Open WebUI](https://github.com/open-webui/open-webui) (126k★) is the most popular self-hosted chat interface for AI. With Hermes Agent's built-in API server, you can use Open WebUI as a polished web frontend for your agent — complete with conversation management, user accounts, and a modern chat interface.
## Architecture
```
┌──────────────────┐ POST /v1/chat/completions ┌──────────────────────┐
│ Open WebUI │ ──────────────────────────────► │ hermes-agent │
│ (browser UI) │ SSE streaming response │ gateway API server │
│ port 3000 │ ◄────────────────────────────── │ port 8642 │
└──────────────────┘ └──────────────────────┘
```
Open WebUI connects to Hermes Agent's API server just like it would connect to OpenAI. Your agent handles the requests with its full toolset — terminal, file operations, web search, memory, skills — and returns the final response.
## Quick Setup
### 1. Enable the API server
Add to `~/.hermes/.env`:
```bash
API_SERVER_ENABLED=true
# Optional: set a key for auth (recommended if accessible beyond localhost)
# API_SERVER_KEY=your-secret-key
```
### 2. Start Hermes Agent gateway
```bash
hermes gateway
```
You should see:
```
[API Server] API server listening on http://127.0.0.1:8642
```
### 3. Start Open WebUI
```bash
docker run -d -p 3000:8080 \
-e OPENAI_API_BASE_URL=http://host.docker.internal:8642/v1 \
-e OPENAI_API_KEY=not-needed \
--add-host=host.docker.internal:host-gateway \
-v open-webui:/app/backend/data \
--name open-webui \
--restart always \
ghcr.io/open-webui/open-webui:main
```
If you set an `API_SERVER_KEY`, use it instead of `not-needed`:
```bash
-e OPENAI_API_KEY=your-secret-key
```
### 4. Open the UI
Go to **http://localhost:3000**. Create your admin account (the first user becomes admin). You should see **hermes-agent** in the model dropdown. Start chatting!
## Docker Compose Setup
For a more permanent setup, create a `docker-compose.yml`:
```yaml
services:
open-webui:
image: ghcr.io/open-webui/open-webui:main
ports:
- "3000:8080"
volumes:
- open-webui:/app/backend/data
environment:
- OPENAI_API_BASE_URL=http://host.docker.internal:8642/v1
- OPENAI_API_KEY=not-needed
extra_hosts:
- "host.docker.internal:host-gateway"
restart: always
volumes:
open-webui:
```
Then:
```bash
docker compose up -d
```
## Configuring via the Admin UI
If you prefer to configure the connection through the UI instead of environment variables:
1. Log in to Open WebUI at **http://localhost:3000**
2. Click your **profile avatar****Admin Settings**
3. Go to **Connections**
4. Under **OpenAI API**, click the **wrench icon** (Manage)
5. Click **+ Add New Connection**
6. Enter:
- **URL**: `http://host.docker.internal:8642/v1`
- **API Key**: your key or any non-empty value (e.g., `not-needed`)
7. Click the **checkmark** to verify the connection
8. **Save**
The **hermes-agent** model should now appear in the model dropdown.
:::warning
Environment variables only take effect on Open WebUI's **first launch**. After that, connection settings are stored in its internal database. To change them later, use the Admin UI or delete the Docker volume and start fresh.
:::
## API Type: Chat Completions vs Responses
Open WebUI supports two API modes when connecting to a backend:
| Mode | Format | When to use |
|------|--------|-------------|
| **Chat Completions** (default) | `/v1/chat/completions` | Recommended. Works out of the box. |
| **Responses** (experimental) | `/v1/responses` | For server-side conversation state via `previous_response_id`. |
### Using Chat Completions (recommended)
This is the default and requires no extra configuration. Open WebUI sends standard OpenAI-format requests and Hermes Agent responds accordingly. Each request includes the full conversation history.
### Using Responses API
To use the Responses API mode:
1. Go to **Admin Settings****Connections****OpenAI****Manage**
2. Edit your hermes-agent connection
3. Change **API Type** from "Chat Completions" to **"Responses (Experimental)"**
4. Save
With the Responses API, Open WebUI sends requests in the Responses format (`input` array + `instructions`), and Hermes Agent can preserve full tool call history across turns via `previous_response_id`.
:::note
Open WebUI currently manages conversation history client-side even in Responses mode — it sends the full message history in each request rather than using `previous_response_id`. The Responses API mode is mainly useful for future compatibility as frontends evolve.
:::
## How It Works
When you send a message in Open WebUI:
1. Open WebUI sends a `POST /v1/chat/completions` request with your message and conversation history
2. Hermes Agent creates an AIAgent instance with its full toolset
3. The agent processes your request — it may call tools (terminal, file operations, web search, etc.)
4. Tool calls happen invisibly server-side
5. The agent's final text response is returned to Open WebUI
6. Open WebUI displays the response in its chat interface
Your agent has access to all the same tools and capabilities as when using the CLI or Telegram — the only difference is the frontend.
## Configuration Reference
### Hermes Agent (API server)
| Variable | Default | Description |
|----------|---------|-------------|
| `API_SERVER_ENABLED` | `false` | Enable the API server |
| `API_SERVER_PORT` | `8642` | HTTP server port |
| `API_SERVER_HOST` | `127.0.0.1` | Bind address |
| `API_SERVER_KEY` | _(none)_ | Bearer token for auth. No key = allow all. |
### Open WebUI
| Variable | Description |
|----------|-------------|
| `OPENAI_API_BASE_URL` | Hermes Agent's API URL (include `/v1`) |
| `OPENAI_API_KEY` | Must be non-empty. Match your `API_SERVER_KEY`. |
## Troubleshooting
### No models appear in the dropdown
- **Check the URL has `/v1` suffix**: `http://host.docker.internal:8642/v1` (not just `:8642`)
- **Verify the gateway is running**: `curl http://localhost:8642/health` should return `{"status": "ok"}`
- **Check model listing**: `curl http://localhost:8642/v1/models` should return a list with `hermes-agent`
- **Docker networking**: From inside Docker, `localhost` means the container, not your host. Use `host.docker.internal` or `--network=host`.
### Connection test passes but no models load
This is almost always the missing `/v1` suffix. Open WebUI's connection test is a basic connectivity check — it doesn't verify model listing works.
### Response takes a long time
Hermes Agent may be executing multiple tool calls (reading files, running commands, searching the web) before producing its final response. This is normal for complex queries. The response appears all at once when the agent finishes.
### "Invalid API key" errors
Make sure your `OPENAI_API_KEY` in Open WebUI matches the `API_SERVER_KEY` in Hermes Agent. If no key is configured on the Hermes side, any non-empty value works.
## Linux Docker (no Docker Desktop)
On Linux without Docker Desktop, `host.docker.internal` doesn't resolve by default. Options:
```bash
# Option 1: Add host mapping
docker run --add-host=host.docker.internal:host-gateway ...
# Option 2: Use host networking
docker run --network=host -e OPENAI_API_BASE_URL=http://localhost:8642/v1 ...
# Option 3: Use Docker bridge IP
docker run -e OPENAI_API_BASE_URL=http://172.17.0.1:8642/v1 ...
```

View File

@@ -161,6 +161,26 @@ Hermes Agent works in Telegram group chats with a few considerations:
- When privacy mode is off (or bot is admin), the bot sees all messages and can participate naturally
- `TELEGRAM_ALLOWED_USERS` still applies — only authorized users can trigger the bot, even in groups
## Reply Threading Mode
When a response is longer than 4096 characters, Telegram splits it into multiple message chunks. By default, only the first chunk is sent as a reply to your message (showing the "replied to" bubble). You can change this behavior:
| Mode | Behavior |
|------|----------|
| `first` (default) | First chunk replies to your message, rest are standalone |
| `all` | Every chunk replies to your message |
| `off` | No chunks reply — all sent as standalone messages |
### Configure via environment variable
```bash
TELEGRAM_REPLY_TO_MODE=off # or "first" or "all"
```
### Configure via gateway config
In your gateway configuration, set `reply_to_mode` on the Telegram platform config.
## Recent Bot API Features (20242025)
- **Privacy policy:** Telegram now requires bots to have a privacy policy. Set one via BotFather with `/setprivacy_policy`, or Telegram may auto-generate a placeholder. This is particularly important if your bot is public-facing.