mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 15:01:34 +08:00
Compare commits
10 Commits
codex-port
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79b3d36ba8 | ||
|
|
1334d5f014 | ||
|
|
b800e63137 | ||
|
|
d54280ea03 | ||
|
|
95d221c31c | ||
|
|
b2a4092783 | ||
|
|
b3c798d1b6 | ||
|
|
7ae208bfee | ||
|
|
7d771c2b1b | ||
|
|
58dc5c4af1 |
@@ -219,6 +219,22 @@ compression:
|
||||
# Options: "auto", "openrouter", "nous", "main"
|
||||
# summary_provider: "auto"
|
||||
|
||||
# =============================================================================
|
||||
# Streaming (live token-by-token response display)
|
||||
# =============================================================================
|
||||
# When enabled, LLM responses stream token-by-token instead of appearing
|
||||
# all at once. Supported on Telegram, Discord, Slack (via message editing)
|
||||
# and the API server (via SSE). Disabled by default.
|
||||
#
|
||||
# streaming:
|
||||
# enabled: false # Master switch (default: off)
|
||||
# # Per-platform overrides:
|
||||
# # telegram: true
|
||||
# # discord: true
|
||||
# # api_server: true
|
||||
# # edit_interval: 1.5 # Seconds between message edits (default: 1.5)
|
||||
# # min_tokens: 20 # Tokens before first display (default: 20)
|
||||
|
||||
# =============================================================================
|
||||
# Auxiliary Models (Advanced — Experimental)
|
||||
# =============================================================================
|
||||
|
||||
@@ -29,6 +29,7 @@ class Platform(Enum):
|
||||
SIGNAL = "signal"
|
||||
HOMEASSISTANT = "homeassistant"
|
||||
EMAIL = "email"
|
||||
API_SERVER = "api_server"
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -98,6 +99,12 @@ class PlatformConfig:
|
||||
api_key: Optional[str] = None # API key if different from token
|
||||
home_channel: Optional[HomeChannel] = None
|
||||
|
||||
# Reply threading mode (Telegram/Slack)
|
||||
# - "off": Never thread replies to original message
|
||||
# - "first": Only first chunk threads to user's message (default)
|
||||
# - "all": All chunks in multi-part replies thread to user's message
|
||||
reply_to_mode: str = "first"
|
||||
|
||||
# Platform-specific settings
|
||||
extra: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@@ -105,6 +112,7 @@ class PlatformConfig:
|
||||
result = {
|
||||
"enabled": self.enabled,
|
||||
"extra": self.extra,
|
||||
"reply_to_mode": self.reply_to_mode,
|
||||
}
|
||||
if self.token:
|
||||
result["token"] = self.token
|
||||
@@ -125,6 +133,7 @@ class PlatformConfig:
|
||||
token=data.get("token"),
|
||||
api_key=data.get("api_key"),
|
||||
home_channel=home_channel,
|
||||
reply_to_mode=data.get("reply_to_mode", "first"),
|
||||
extra=data.get("extra", {}),
|
||||
)
|
||||
|
||||
@@ -171,6 +180,9 @@ class GatewayConfig:
|
||||
# Email uses extra dict for config (address + imap_host + smtp_host)
|
||||
elif platform == Platform.EMAIL and config.extra.get("address"):
|
||||
connected.append(platform)
|
||||
# API Server uses enabled flag only (no token needed)
|
||||
elif platform == Platform.API_SERVER:
|
||||
connected.append(platform)
|
||||
return connected
|
||||
|
||||
def get_home_channel(self, platform: Platform) -> Optional[HomeChannel]:
|
||||
@@ -346,6 +358,11 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
|
||||
config.platforms[Platform.TELEGRAM].enabled = True
|
||||
config.platforms[Platform.TELEGRAM].token = telegram_token
|
||||
|
||||
# Reply threading mode for Telegram (off/first/all)
|
||||
telegram_reply_mode = os.getenv("TELEGRAM_REPLY_TO_MODE", "").lower()
|
||||
if telegram_reply_mode in ("off", "first", "all"):
|
||||
config.platforms[Platform.TELEGRAM].reply_to_mode = telegram_reply_mode
|
||||
|
||||
telegram_home = os.getenv("TELEGRAM_HOME_CHANNEL")
|
||||
if telegram_home and Platform.TELEGRAM in config.platforms:
|
||||
config.platforms[Platform.TELEGRAM].home_channel = HomeChannel(
|
||||
@@ -446,6 +463,25 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
|
||||
name=os.getenv("EMAIL_HOME_ADDRESS_NAME", "Home"),
|
||||
)
|
||||
|
||||
# API Server
|
||||
api_server_enabled = os.getenv("API_SERVER_ENABLED", "").lower() in ("true", "1", "yes")
|
||||
api_server_key = os.getenv("API_SERVER_KEY", "")
|
||||
api_server_port = os.getenv("API_SERVER_PORT")
|
||||
api_server_host = os.getenv("API_SERVER_HOST")
|
||||
if api_server_enabled or api_server_key:
|
||||
if Platform.API_SERVER not in config.platforms:
|
||||
config.platforms[Platform.API_SERVER] = PlatformConfig()
|
||||
config.platforms[Platform.API_SERVER].enabled = True
|
||||
if api_server_key:
|
||||
config.platforms[Platform.API_SERVER].extra["key"] = api_server_key
|
||||
if api_server_port:
|
||||
try:
|
||||
config.platforms[Platform.API_SERVER].extra["port"] = int(api_server_port)
|
||||
except ValueError:
|
||||
pass
|
||||
if api_server_host:
|
||||
config.platforms[Platform.API_SERVER].extra["host"] = api_server_host
|
||||
|
||||
# Session settings
|
||||
idle_minutes = os.getenv("SESSION_IDLE_MINUTES")
|
||||
if idle_minutes:
|
||||
|
||||
783
gateway/platforms/api_server.py
Normal file
783
gateway/platforms/api_server.py
Normal file
@@ -0,0 +1,783 @@
|
||||
"""
|
||||
OpenAI-compatible API server platform adapter.
|
||||
|
||||
Exposes an HTTP server with endpoints:
|
||||
- POST /v1/chat/completions — OpenAI Chat Completions format (stateless)
|
||||
- POST /v1/responses — OpenAI Responses API format (stateful via previous_response_id)
|
||||
- GET /v1/responses/{response_id} — Retrieve a stored response
|
||||
- DELETE /v1/responses/{response_id} — Delete a stored response
|
||||
- GET /v1/models — lists hermes-agent as an available model
|
||||
- GET /health — health check
|
||||
|
||||
Any OpenAI-compatible frontend (Open WebUI, LobeChat, etc.) can connect
|
||||
to hermes-agent through this adapter.
|
||||
|
||||
Requires:
|
||||
- aiohttp (already available in the gateway)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from functools import wraps
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
try:
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
AIOHTTP_AVAILABLE = True
|
||||
except ImportError:
|
||||
AIOHTTP_AVAILABLE = False
|
||||
aiohttp = None # type: ignore[assignment]
|
||||
web = None # type: ignore[assignment]
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import (
|
||||
BasePlatformAdapter,
|
||||
MessageEvent,
|
||||
MessageType,
|
||||
SendResult,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default settings
|
||||
DEFAULT_HOST = "127.0.0.1"
|
||||
DEFAULT_PORT = 8642
|
||||
MAX_STORED_RESPONSES = 100
|
||||
|
||||
|
||||
def check_api_server_requirements() -> bool:
|
||||
"""Check if API server dependencies are available."""
|
||||
return AIOHTTP_AVAILABLE
|
||||
|
||||
|
||||
class ResponseStore:
|
||||
"""
|
||||
In-memory LRU store for Responses API state.
|
||||
|
||||
Each stored response includes the full internal conversation history
|
||||
(with tool calls and results) so it can be reconstructed on subsequent
|
||||
requests via previous_response_id.
|
||||
"""
|
||||
|
||||
def __init__(self, max_size: int = MAX_STORED_RESPONSES):
|
||||
self._store: collections.OrderedDict[str, Dict[str, Any]] = collections.OrderedDict()
|
||||
self._max_size = max_size
|
||||
|
||||
def get(self, response_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Retrieve a stored response by ID (moves to end for LRU)."""
|
||||
if response_id in self._store:
|
||||
self._store.move_to_end(response_id)
|
||||
return self._store[response_id]
|
||||
return None
|
||||
|
||||
def put(self, response_id: str, data: Dict[str, Any]) -> None:
|
||||
"""Store a response, evicting the oldest if at capacity."""
|
||||
if response_id in self._store:
|
||||
self._store.move_to_end(response_id)
|
||||
self._store[response_id] = data
|
||||
while len(self._store) > self._max_size:
|
||||
self._store.popitem(last=False)
|
||||
|
||||
def delete(self, response_id: str) -> bool:
|
||||
"""Remove a response from the store. Returns True if found and deleted."""
|
||||
if response_id in self._store:
|
||||
del self._store[response_id]
|
||||
return True
|
||||
return False
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._store)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CORS middleware
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_CORS_HEADERS = {
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
"Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS",
|
||||
"Access-Control-Allow-Headers": "Authorization, Content-Type",
|
||||
}
|
||||
|
||||
|
||||
if AIOHTTP_AVAILABLE:
|
||||
@web.middleware
|
||||
async def cors_middleware(request, handler):
|
||||
"""Add CORS headers to every response; handle OPTIONS preflight."""
|
||||
if request.method == "OPTIONS":
|
||||
return web.Response(status=200, headers=_CORS_HEADERS)
|
||||
response = await handler(request)
|
||||
response.headers.update(_CORS_HEADERS)
|
||||
return response
|
||||
else:
|
||||
cors_middleware = None # type: ignore[assignment]
|
||||
|
||||
|
||||
class APIServerAdapter(BasePlatformAdapter):
|
||||
"""
|
||||
OpenAI-compatible HTTP API server adapter.
|
||||
|
||||
Runs an aiohttp web server that accepts OpenAI-format requests
|
||||
and routes them through hermes-agent's AIAgent.
|
||||
"""
|
||||
|
||||
def __init__(self, config: PlatformConfig):
|
||||
super().__init__(config, Platform.API_SERVER)
|
||||
extra = config.extra or {}
|
||||
self._host: str = extra.get("host", os.getenv("API_SERVER_HOST", DEFAULT_HOST))
|
||||
self._port: int = int(extra.get("port", os.getenv("API_SERVER_PORT", str(DEFAULT_PORT))))
|
||||
self._api_key: str = extra.get("key", os.getenv("API_SERVER_KEY", ""))
|
||||
self._app: Optional["web.Application"] = None
|
||||
self._runner: Optional["web.AppRunner"] = None
|
||||
self._site: Optional["web.TCPSite"] = None
|
||||
self._response_store = ResponseStore()
|
||||
# Conversation name → latest response_id mapping
|
||||
self._conversations: Dict[str, str] = {}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Auth helper
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _check_auth(self, request: "web.Request") -> Optional["web.Response"]:
|
||||
"""
|
||||
Validate Bearer token from Authorization header.
|
||||
|
||||
Returns None if auth is OK, or a 401 web.Response on failure.
|
||||
If no API key is configured, all requests are allowed.
|
||||
"""
|
||||
if not self._api_key:
|
||||
return None # No key configured — allow all (local-only use)
|
||||
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
if auth_header.startswith("Bearer "):
|
||||
token = auth_header[7:].strip()
|
||||
if token == self._api_key:
|
||||
return None # Auth OK
|
||||
|
||||
return web.json_response(
|
||||
{"error": {"message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key"}},
|
||||
status=401,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Agent creation helper
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _create_agent(
|
||||
self,
|
||||
ephemeral_system_prompt: Optional[str] = None,
|
||||
session_id: Optional[str] = None,
|
||||
stream_callback=None,
|
||||
) -> Any:
|
||||
"""
|
||||
Create an AIAgent instance using the gateway's runtime config.
|
||||
|
||||
Uses _resolve_runtime_agent_kwargs() to pick up model, api_key,
|
||||
base_url, etc. from config.yaml / env vars.
|
||||
"""
|
||||
from run_agent import AIAgent
|
||||
from gateway.run import _resolve_runtime_agent_kwargs, _resolve_model
|
||||
|
||||
runtime_kwargs = _resolve_runtime_agent_kwargs()
|
||||
model = _resolve_model()
|
||||
|
||||
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90"))
|
||||
|
||||
agent = AIAgent(
|
||||
model=model,
|
||||
**runtime_kwargs,
|
||||
max_iterations=max_iterations,
|
||||
quiet_mode=True,
|
||||
verbose_logging=False,
|
||||
ephemeral_system_prompt=ephemeral_system_prompt or None,
|
||||
session_id=session_id,
|
||||
platform="api_server",
|
||||
stream_callback=stream_callback,
|
||||
)
|
||||
return agent
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# HTTP Handlers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _handle_health(self, request: "web.Request") -> "web.Response":
|
||||
"""GET /health — simple health check."""
|
||||
return web.json_response({"status": "ok", "platform": "hermes-agent"})
|
||||
|
||||
async def _handle_models(self, request: "web.Request") -> "web.Response":
|
||||
"""GET /v1/models — return hermes-agent as an available model."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
|
||||
return web.json_response({
|
||||
"object": "list",
|
||||
"data": [
|
||||
{
|
||||
"id": "hermes-agent",
|
||||
"object": "model",
|
||||
"created": int(time.time()),
|
||||
"owned_by": "hermes",
|
||||
"permission": [],
|
||||
"root": "hermes-agent",
|
||||
"parent": None,
|
||||
}
|
||||
],
|
||||
})
|
||||
|
||||
async def _handle_chat_completions(self, request: "web.Request") -> "web.Response":
|
||||
"""POST /v1/chat/completions — OpenAI Chat Completions format."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
|
||||
# Parse request body
|
||||
try:
|
||||
body = await request.json()
|
||||
except (json.JSONDecodeError, Exception):
|
||||
return web.json_response(
|
||||
{"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}},
|
||||
status=400,
|
||||
)
|
||||
|
||||
messages = body.get("messages")
|
||||
if not messages or not isinstance(messages, list):
|
||||
return web.json_response(
|
||||
{"error": {"message": "Missing or invalid 'messages' field", "type": "invalid_request_error"}},
|
||||
status=400,
|
||||
)
|
||||
|
||||
stream = body.get("stream", False)
|
||||
|
||||
# Extract system message (becomes ephemeral system prompt layered ON TOP of core)
|
||||
system_prompt = None
|
||||
conversation_messages: List[Dict[str, str]] = []
|
||||
|
||||
for msg in messages:
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
if role == "system":
|
||||
# Accumulate system messages
|
||||
if system_prompt is None:
|
||||
system_prompt = content
|
||||
else:
|
||||
system_prompt = system_prompt + "\n" + content
|
||||
elif role in ("user", "assistant"):
|
||||
conversation_messages.append({"role": role, "content": content})
|
||||
|
||||
# Extract the last user message as the primary input
|
||||
user_message = ""
|
||||
history = []
|
||||
if conversation_messages:
|
||||
user_message = conversation_messages[-1].get("content", "")
|
||||
history = conversation_messages[:-1]
|
||||
|
||||
if not user_message:
|
||||
return web.json_response(
|
||||
{"error": {"message": "No user message found in messages", "type": "invalid_request_error"}},
|
||||
status=400,
|
||||
)
|
||||
|
||||
session_id = str(uuid.uuid4())
|
||||
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
|
||||
model_name = body.get("model", "hermes-agent")
|
||||
created = int(time.time())
|
||||
|
||||
if stream:
|
||||
import queue as _q
|
||||
_stream_q = _q.Queue()
|
||||
def _on_api_token(delta):
|
||||
_stream_q.put(delta) # None = done
|
||||
|
||||
# Start agent in background
|
||||
agent_task = asyncio.ensure_future(self._run_agent(
|
||||
user_message=user_message,
|
||||
conversation_history=history,
|
||||
ephemeral_system_prompt=system_prompt,
|
||||
session_id=session_id,
|
||||
stream_callback=_on_api_token,
|
||||
))
|
||||
|
||||
return await self._write_real_sse_chat_completion(
|
||||
request, completion_id, model_name, created, _stream_q, agent_task
|
||||
)
|
||||
|
||||
# Non-streaming: run the agent and return full response
|
||||
try:
|
||||
result, usage = await self._run_agent(
|
||||
user_message=user_message,
|
||||
conversation_history=history,
|
||||
ephemeral_system_prompt=system_prompt,
|
||||
session_id=session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Error running agent for chat completions: %s", e, exc_info=True)
|
||||
return web.json_response(
|
||||
{"error": {"message": f"Internal server error: {e}", "type": "server_error"}},
|
||||
status=500,
|
||||
)
|
||||
|
||||
final_response = result.get("final_response", "")
|
||||
if not final_response:
|
||||
final_response = result.get("error", "(No response generated)")
|
||||
|
||||
response_data = {
|
||||
"id": completion_id,
|
||||
"object": "chat.completion",
|
||||
"created": created,
|
||||
"model": model_name,
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": final_response,
|
||||
},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
"usage": {
|
||||
"prompt_tokens": usage.get("input_tokens", 0),
|
||||
"completion_tokens": usage.get("output_tokens", 0),
|
||||
"total_tokens": usage.get("total_tokens", 0),
|
||||
},
|
||||
}
|
||||
|
||||
return web.json_response(response_data)
|
||||
|
||||
async def _write_real_sse_chat_completion(
|
||||
self, request: "web.Request", completion_id: str, model: str,
|
||||
created: int, stream_q, agent_task,
|
||||
) -> "web.StreamResponse":
|
||||
"""Write real streaming SSE from agent's stream_callback queue."""
|
||||
import queue as _q
|
||||
|
||||
response = web.StreamResponse(
|
||||
status=200,
|
||||
headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"},
|
||||
)
|
||||
await response.prepare(request)
|
||||
|
||||
# Role chunk
|
||||
role_chunk = {
|
||||
"id": completion_id, "object": "chat.completion.chunk",
|
||||
"created": created, "model": model,
|
||||
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
|
||||
}
|
||||
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
|
||||
|
||||
# Stream content chunks as they arrive from the agent
|
||||
loop = asyncio.get_event_loop()
|
||||
while True:
|
||||
try:
|
||||
delta = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5))
|
||||
except _q.Empty:
|
||||
if agent_task.done():
|
||||
break
|
||||
continue
|
||||
|
||||
if delta is None: # End of stream
|
||||
break
|
||||
|
||||
content_chunk = {
|
||||
"id": completion_id, "object": "chat.completion.chunk",
|
||||
"created": created, "model": model,
|
||||
"choices": [{"index": 0, "delta": {"content": delta}, "finish_reason": None}],
|
||||
}
|
||||
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
|
||||
|
||||
# Get usage from completed agent
|
||||
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
|
||||
try:
|
||||
result, agent_usage = await agent_task
|
||||
usage = agent_usage or usage
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Finish chunk
|
||||
finish_chunk = {
|
||||
"id": completion_id, "object": "chat.completion.chunk",
|
||||
"created": created, "model": model,
|
||||
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
|
||||
"usage": {
|
||||
"prompt_tokens": usage.get("input_tokens", 0),
|
||||
"completion_tokens": usage.get("output_tokens", 0),
|
||||
"total_tokens": usage.get("total_tokens", 0),
|
||||
},
|
||||
}
|
||||
await response.write(f"data: {json.dumps(finish_chunk)}\n\n".encode())
|
||||
await response.write(b"data: [DONE]\n\n")
|
||||
|
||||
return response
|
||||
|
||||
async def _handle_responses(self, request: "web.Request") -> "web.Response":
|
||||
"""POST /v1/responses — OpenAI Responses API format."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
|
||||
# Parse request body
|
||||
try:
|
||||
body = await request.json()
|
||||
except (json.JSONDecodeError, Exception):
|
||||
return web.json_response(
|
||||
{"error": {"message": "Invalid JSON in request body", "type": "invalid_request_error"}},
|
||||
status=400,
|
||||
)
|
||||
|
||||
raw_input = body.get("input")
|
||||
if raw_input is None:
|
||||
return web.json_response(
|
||||
{"error": {"message": "Missing 'input' field", "type": "invalid_request_error"}},
|
||||
status=400,
|
||||
)
|
||||
|
||||
instructions = body.get("instructions")
|
||||
previous_response_id = body.get("previous_response_id")
|
||||
conversation = body.get("conversation")
|
||||
store = body.get("store", True)
|
||||
|
||||
# conversation and previous_response_id are mutually exclusive
|
||||
if conversation and previous_response_id:
|
||||
return web.json_response(
|
||||
{"error": {"message": "Cannot use both 'conversation' and 'previous_response_id'", "type": "invalid_request_error"}},
|
||||
status=400,
|
||||
)
|
||||
|
||||
# Resolve conversation name to latest response_id
|
||||
if conversation:
|
||||
previous_response_id = self._conversations.get(conversation)
|
||||
# No error if conversation doesn't exist yet — it's a new conversation
|
||||
|
||||
# Normalize input to message list
|
||||
input_messages: List[Dict[str, str]] = []
|
||||
if isinstance(raw_input, str):
|
||||
input_messages = [{"role": "user", "content": raw_input}]
|
||||
elif isinstance(raw_input, list):
|
||||
for item in raw_input:
|
||||
if isinstance(item, str):
|
||||
input_messages.append({"role": "user", "content": item})
|
||||
elif isinstance(item, dict):
|
||||
role = item.get("role", "user")
|
||||
content = item.get("content", "")
|
||||
# Handle content that may be a list of content parts
|
||||
if isinstance(content, list):
|
||||
text_parts = []
|
||||
for part in content:
|
||||
if isinstance(part, dict) and part.get("type") == "input_text":
|
||||
text_parts.append(part.get("text", ""))
|
||||
elif isinstance(part, dict) and part.get("type") == "output_text":
|
||||
text_parts.append(part.get("text", ""))
|
||||
elif isinstance(part, str):
|
||||
text_parts.append(part)
|
||||
content = "\n".join(text_parts)
|
||||
input_messages.append({"role": role, "content": content})
|
||||
else:
|
||||
return web.json_response(
|
||||
{"error": {"message": "'input' must be a string or array", "type": "invalid_request_error"}},
|
||||
status=400,
|
||||
)
|
||||
|
||||
# Reconstruct conversation history from previous_response_id
|
||||
conversation_history: List[Dict[str, str]] = []
|
||||
if previous_response_id:
|
||||
stored = self._response_store.get(previous_response_id)
|
||||
if stored is None:
|
||||
return web.json_response(
|
||||
{"error": {"message": f"Previous response not found: {previous_response_id}", "type": "invalid_request_error"}},
|
||||
status=404,
|
||||
)
|
||||
conversation_history = list(stored.get("conversation_history", []))
|
||||
# If no instructions provided, carry forward from previous
|
||||
if instructions is None:
|
||||
instructions = stored.get("instructions")
|
||||
|
||||
# Append new input messages to history (all but the last become history)
|
||||
for msg in input_messages[:-1]:
|
||||
conversation_history.append(msg)
|
||||
|
||||
# Last input message is the user_message
|
||||
user_message = input_messages[-1].get("content", "") if input_messages else ""
|
||||
if not user_message:
|
||||
return web.json_response(
|
||||
{"error": {"message": "No user message found in input", "type": "invalid_request_error"}},
|
||||
status=400,
|
||||
)
|
||||
|
||||
# Truncation support
|
||||
if body.get("truncation") == "auto" and len(conversation_history) > 100:
|
||||
conversation_history = conversation_history[-100:]
|
||||
|
||||
# Run the agent
|
||||
session_id = str(uuid.uuid4())
|
||||
try:
|
||||
result, usage = await self._run_agent(
|
||||
user_message=user_message,
|
||||
conversation_history=conversation_history,
|
||||
ephemeral_system_prompt=instructions,
|
||||
session_id=session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Error running agent for responses: %s", e, exc_info=True)
|
||||
return web.json_response(
|
||||
{"error": {"message": f"Internal server error: {e}", "type": "server_error"}},
|
||||
status=500,
|
||||
)
|
||||
|
||||
final_response = result.get("final_response", "")
|
||||
if not final_response:
|
||||
final_response = result.get("error", "(No response generated)")
|
||||
|
||||
response_id = f"resp_{uuid.uuid4().hex[:28]}"
|
||||
created_at = int(time.time())
|
||||
|
||||
# Build the full conversation history for storage
|
||||
# (includes tool calls from the agent run)
|
||||
full_history = list(conversation_history)
|
||||
full_history.append({"role": "user", "content": user_message})
|
||||
# Add agent's internal messages if available
|
||||
agent_messages = result.get("messages", [])
|
||||
if agent_messages:
|
||||
full_history.extend(agent_messages)
|
||||
else:
|
||||
full_history.append({"role": "assistant", "content": final_response})
|
||||
|
||||
# Build output items (includes tool calls + final message)
|
||||
output_items = self._extract_output_items(result)
|
||||
|
||||
response_data = {
|
||||
"id": response_id,
|
||||
"object": "response",
|
||||
"status": "completed",
|
||||
"created_at": created_at,
|
||||
"model": body.get("model", "hermes-agent"),
|
||||
"output": output_items,
|
||||
"usage": {
|
||||
"input_tokens": usage.get("input_tokens", 0),
|
||||
"output_tokens": usage.get("output_tokens", 0),
|
||||
"total_tokens": usage.get("total_tokens", 0),
|
||||
},
|
||||
}
|
||||
|
||||
# Store the complete response object for future chaining / GET retrieval
|
||||
if store:
|
||||
self._response_store.put(response_id, {
|
||||
"response": response_data,
|
||||
"conversation_history": full_history,
|
||||
"instructions": instructions,
|
||||
})
|
||||
# Update conversation mapping so the next request with the same
|
||||
# conversation name automatically chains to this response
|
||||
if conversation:
|
||||
self._conversations[conversation] = response_id
|
||||
|
||||
return web.json_response(response_data)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Agent execution
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# GET / DELETE response endpoints
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _handle_get_response(self, request: "web.Request") -> "web.Response":
|
||||
"""GET /v1/responses/{response_id} — retrieve a stored response."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
|
||||
response_id = request.match_info["response_id"]
|
||||
stored = self._response_store.get(response_id)
|
||||
if stored is None:
|
||||
return web.json_response(
|
||||
{"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}},
|
||||
status=404,
|
||||
)
|
||||
|
||||
return web.json_response(stored["response"])
|
||||
|
||||
async def _handle_delete_response(self, request: "web.Request") -> "web.Response":
|
||||
"""DELETE /v1/responses/{response_id} — delete a stored response."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
|
||||
response_id = request.match_info["response_id"]
|
||||
deleted = self._response_store.delete(response_id)
|
||||
if not deleted:
|
||||
return web.json_response(
|
||||
{"error": {"message": f"Response not found: {response_id}", "type": "invalid_request_error"}},
|
||||
status=404,
|
||||
)
|
||||
|
||||
return web.json_response({
|
||||
"id": response_id,
|
||||
"object": "response",
|
||||
"deleted": True,
|
||||
})
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Output extraction helper
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _extract_output_items(result: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Build the full output item array from the agent's messages.
|
||||
|
||||
Walks *result["messages"]* and emits:
|
||||
- ``function_call`` items for each tool_call on assistant messages
|
||||
- ``function_call_output`` items for each tool-role message
|
||||
- a final ``message`` item with the assistant's text reply
|
||||
"""
|
||||
items: List[Dict[str, Any]] = []
|
||||
messages = result.get("messages", [])
|
||||
|
||||
for msg in messages:
|
||||
role = msg.get("role")
|
||||
if role == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
func = tc.get("function", {})
|
||||
items.append({
|
||||
"type": "function_call",
|
||||
"name": func.get("name", ""),
|
||||
"arguments": func.get("arguments", ""),
|
||||
"call_id": tc.get("id", ""),
|
||||
})
|
||||
elif role == "tool":
|
||||
items.append({
|
||||
"type": "function_call_output",
|
||||
"call_id": msg.get("tool_call_id", ""),
|
||||
"output": msg.get("content", ""),
|
||||
})
|
||||
|
||||
# Final assistant message
|
||||
final = result.get("final_response", "")
|
||||
if not final:
|
||||
final = result.get("error", "(No response generated)")
|
||||
|
||||
items.append({
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "output_text",
|
||||
"text": final,
|
||||
}
|
||||
],
|
||||
})
|
||||
return items
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Agent execution
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _run_agent(
|
||||
self,
|
||||
user_message: str,
|
||||
conversation_history: List[Dict[str, str]],
|
||||
ephemeral_system_prompt: Optional[str] = None,
|
||||
session_id: Optional[str] = None,
|
||||
stream_callback=None,
|
||||
) -> tuple:
|
||||
"""
|
||||
Create an agent and run a conversation in a thread executor.
|
||||
|
||||
Returns ``(result_dict, usage_dict)`` where *usage_dict* contains
|
||||
``input_tokens``, ``output_tokens`` and ``total_tokens``.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
def _run():
|
||||
agent = self._create_agent(
|
||||
ephemeral_system_prompt=ephemeral_system_prompt,
|
||||
session_id=session_id,
|
||||
stream_callback=stream_callback,
|
||||
)
|
||||
result = agent.run_conversation(
|
||||
user_message=user_message,
|
||||
conversation_history=conversation_history,
|
||||
)
|
||||
usage = {
|
||||
"input_tokens": getattr(agent, "session_prompt_tokens", 0) or 0,
|
||||
"output_tokens": getattr(agent, "session_completion_tokens", 0) or 0,
|
||||
"total_tokens": getattr(agent, "session_total_tokens", 0) or 0,
|
||||
}
|
||||
return result, usage
|
||||
|
||||
return await loop.run_in_executor(None, _run)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# BasePlatformAdapter interface
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Start the aiohttp web server."""
|
||||
if not AIOHTTP_AVAILABLE:
|
||||
logger.warning("[%s] aiohttp not installed", self.name)
|
||||
return False
|
||||
|
||||
try:
|
||||
self._app = web.Application(middlewares=[cors_middleware])
|
||||
self._app.router.add_get("/health", self._handle_health)
|
||||
self._app.router.add_get("/v1/models", self._handle_models)
|
||||
self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions)
|
||||
self._app.router.add_post("/v1/responses", self._handle_responses)
|
||||
self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response)
|
||||
self._app.router.add_delete("/v1/responses/{response_id}", self._handle_delete_response)
|
||||
|
||||
self._runner = web.AppRunner(self._app)
|
||||
await self._runner.setup()
|
||||
self._site = web.TCPSite(self._runner, self._host, self._port)
|
||||
await self._site.start()
|
||||
|
||||
self._running = True
|
||||
logger.info(
|
||||
"[%s] API server listening on http://%s:%d",
|
||||
self.name, self._host, self._port,
|
||||
)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error("[%s] Failed to start API server: %s", self.name, e)
|
||||
return False
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Stop the aiohttp web server."""
|
||||
self._running = False
|
||||
if self._site:
|
||||
await self._site.stop()
|
||||
self._site = None
|
||||
if self._runner:
|
||||
await self._runner.cleanup()
|
||||
self._runner = None
|
||||
self._app = None
|
||||
logger.info("[%s] API server stopped", self.name)
|
||||
|
||||
async def send(
|
||||
self,
|
||||
chat_id: str,
|
||||
content: str,
|
||||
reply_to: Optional[str] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> SendResult:
|
||||
"""
|
||||
Not used — HTTP request/response cycle handles delivery directly.
|
||||
"""
|
||||
return SendResult(success=False, error="API server uses HTTP request/response, not send()")
|
||||
|
||||
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
||||
"""Return basic info about the API server."""
|
||||
return {
|
||||
"name": "API Server",
|
||||
"type": "api",
|
||||
"host": self._host,
|
||||
"port": self._port,
|
||||
}
|
||||
@@ -101,15 +101,17 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
- Sending responses with Telegram markdown
|
||||
- Forum topics (thread_id support)
|
||||
- Media messages
|
||||
- Reply threading modes (off/first/all)
|
||||
"""
|
||||
|
||||
# Telegram message limits
|
||||
MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
def __init__(self, config: PlatformConfig):
|
||||
super().__init__(config, Platform.TELEGRAM)
|
||||
self._app: Optional[Application] = None
|
||||
self._bot: Optional[Bot] = None
|
||||
self._reply_to_mode: str = getattr(config, 'reply_to_mode', 'first') or 'first'
|
||||
self._delivery_progress: Dict[str, bool] = {}
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Telegram and start polling for updates."""
|
||||
@@ -206,6 +208,29 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
self._bot = None
|
||||
logger.info("[%s] Disconnected from Telegram", self.name)
|
||||
|
||||
def _should_thread_reply(self, chat_id: str, reply_to: Optional[str], chunk_index: int) -> bool:
|
||||
"""
|
||||
Determine if this message chunk should thread to the original message.
|
||||
|
||||
Args:
|
||||
chat_id: The chat ID
|
||||
reply_to: The original message ID to reply to
|
||||
chunk_index: Index of this chunk (0 = first chunk)
|
||||
|
||||
Returns:
|
||||
True if this chunk should be threaded to the original message
|
||||
"""
|
||||
if not reply_to:
|
||||
return False
|
||||
|
||||
mode = self._reply_to_mode
|
||||
if mode == "off":
|
||||
return False
|
||||
elif mode == "all":
|
||||
return True
|
||||
else: # "first" (default)
|
||||
return chunk_index == 0
|
||||
|
||||
async def send(
|
||||
self,
|
||||
chat_id: str,
|
||||
@@ -218,7 +243,6 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
return SendResult(success=False, error="Not connected")
|
||||
|
||||
try:
|
||||
# Format and split message if needed
|
||||
formatted = self.format_message(content)
|
||||
chunks = self.truncate_message(formatted, self.MAX_MESSAGE_LENGTH)
|
||||
|
||||
@@ -226,31 +250,30 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
thread_id = metadata.get("thread_id") if metadata else None
|
||||
|
||||
for i, chunk in enumerate(chunks):
|
||||
# Try Markdown first, fall back to plain text if it fails
|
||||
should_thread = self._should_thread_reply(chat_id, reply_to, i)
|
||||
reply_to_id = int(reply_to) if should_thread else None
|
||||
|
||||
try:
|
||||
msg = await self._bot.send_message(
|
||||
chat_id=int(chat_id),
|
||||
text=chunk,
|
||||
parse_mode=ParseMode.MARKDOWN_V2,
|
||||
reply_to_message_id=int(reply_to) if reply_to and i == 0 else None,
|
||||
reply_to_message_id=reply_to_id,
|
||||
message_thread_id=int(thread_id) if thread_id else None,
|
||||
)
|
||||
except Exception as md_error:
|
||||
# Markdown parsing failed, try plain text
|
||||
if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower():
|
||||
logger.warning("[%s] MarkdownV2 parse failed, falling back to plain text: %s", self.name, md_error)
|
||||
# Strip MDV2 escape backslashes so the user doesn't
|
||||
# see raw backslashes littered through the message.
|
||||
plain_chunk = _strip_mdv2(chunk)
|
||||
msg = await self._bot.send_message(
|
||||
chat_id=int(chat_id),
|
||||
text=plain_chunk,
|
||||
parse_mode=None, # Plain text
|
||||
reply_to_message_id=int(reply_to) if reply_to and i == 0 else None,
|
||||
parse_mode=None,
|
||||
reply_to_message_id=reply_to_id,
|
||||
message_thread_id=int(thread_id) if thread_id else None,
|
||||
)
|
||||
else:
|
||||
raise # Re-raise if not a parse error
|
||||
raise
|
||||
message_ids.append(str(msg.message_id))
|
||||
|
||||
return SendResult(
|
||||
|
||||
200
gateway/run.py
200
gateway/run.py
@@ -19,6 +19,7 @@ import os
|
||||
import re
|
||||
import sys
|
||||
import signal
|
||||
import time
|
||||
import threading
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from pathlib import Path
|
||||
@@ -165,6 +166,28 @@ from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageTyp
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _resolve_model() -> str:
|
||||
"""Resolve the model name from env vars and config.yaml.
|
||||
|
||||
Priority: HERMES_MODEL env > LLM_MODEL env > config.yaml model.default > fallback.
|
||||
"""
|
||||
model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
|
||||
try:
|
||||
import yaml
|
||||
_cfg_path = Path.home() / ".hermes" / "config.yaml"
|
||||
if _cfg_path.exists():
|
||||
with open(_cfg_path, encoding="utf-8") as f:
|
||||
cfg = yaml.safe_load(f) or {}
|
||||
model_cfg = cfg.get("model", {})
|
||||
if isinstance(model_cfg, str):
|
||||
model = model_cfg
|
||||
elif isinstance(model_cfg, dict):
|
||||
model = model_cfg.get("default", model)
|
||||
except Exception:
|
||||
pass
|
||||
return model
|
||||
|
||||
|
||||
def _resolve_runtime_agent_kwargs() -> dict:
|
||||
"""Resolve provider credentials for gateway-created AIAgent instances."""
|
||||
from hermes_cli.runtime_provider import (
|
||||
@@ -206,6 +229,7 @@ class GatewayRunner:
|
||||
self._reasoning_config = self._load_reasoning_config()
|
||||
self._provider_routing = self._load_provider_routing()
|
||||
self._fallback_model = self._load_fallback_model()
|
||||
self._streaming_config = self._load_streaming_config()
|
||||
|
||||
# Wire process registry into session store for reset protection
|
||||
from tools.process_registry import process_registry
|
||||
@@ -460,6 +484,40 @@ class GatewayRunner:
|
||||
pass
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _load_streaming_config() -> dict:
|
||||
"""Load streaming config from config.yaml at startup.
|
||||
|
||||
Returns a dict like {"enabled": False, "telegram": True, ...}.
|
||||
Per-platform keys override the global 'enabled' flag.
|
||||
The HERMES_STREAMING_ENABLED env var overrides everything.
|
||||
"""
|
||||
config = {"enabled": False}
|
||||
try:
|
||||
import yaml as _y
|
||||
cfg_path = _hermes_home / "config.yaml"
|
||||
if cfg_path.exists():
|
||||
with open(cfg_path, encoding="utf-8") as _f:
|
||||
cfg = _y.safe_load(_f) or {}
|
||||
s_cfg = cfg.get("streaming", {})
|
||||
if isinstance(s_cfg, dict):
|
||||
config = s_cfg
|
||||
except Exception:
|
||||
pass
|
||||
# Env var override
|
||||
if os.getenv("HERMES_STREAMING_ENABLED", "").lower() in ("true", "1", "yes"):
|
||||
config["enabled"] = True
|
||||
return config
|
||||
|
||||
def _is_streaming_enabled(self, platform_key: str) -> bool:
|
||||
"""Check if streaming is enabled for a given platform."""
|
||||
cfg = self._streaming_config
|
||||
# Per-platform override
|
||||
if platform_key and cfg.get(platform_key) is not None:
|
||||
return str(cfg[platform_key]).lower() in ("true", "1", "yes")
|
||||
# Global default
|
||||
return str(cfg.get("enabled", False)).lower() in ("true", "1", "yes")
|
||||
|
||||
async def start(self) -> bool:
|
||||
"""
|
||||
Start the gateway and all configured platform adapters.
|
||||
@@ -679,6 +737,13 @@ class GatewayRunner:
|
||||
return None
|
||||
return EmailAdapter(config)
|
||||
|
||||
elif platform == Platform.API_SERVER:
|
||||
from gateway.platforms.api_server import APIServerAdapter, check_api_server_requirements
|
||||
if not check_api_server_requirements():
|
||||
logger.warning("API Server: aiohttp not installed")
|
||||
return None
|
||||
return APIServerAdapter(config)
|
||||
|
||||
return None
|
||||
|
||||
def _is_user_authorized(self, source: SessionSource) -> bool:
|
||||
@@ -698,6 +763,11 @@ class GatewayRunner:
|
||||
if source.platform == Platform.HOMEASSISTANT:
|
||||
return True
|
||||
|
||||
# API Server handles auth at the HTTP layer (Bearer token),
|
||||
# so requests that reach the gateway runner are already authorized.
|
||||
if source.platform == Platform.API_SERVER:
|
||||
return True
|
||||
|
||||
user_id = source.user_id
|
||||
if not user_id:
|
||||
return False
|
||||
@@ -709,6 +779,7 @@ class GatewayRunner:
|
||||
Platform.SLACK: "SLACK_ALLOWED_USERS",
|
||||
Platform.SIGNAL: "SIGNAL_ALLOWED_USERS",
|
||||
Platform.EMAIL: "EMAIL_ALLOWED_USERS",
|
||||
Platform.API_SERVER: "API_SERVER_ALLOWED_KEYS",
|
||||
}
|
||||
platform_allow_all_map = {
|
||||
Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS",
|
||||
@@ -717,6 +788,7 @@ class GatewayRunner:
|
||||
Platform.SLACK: "SLACK_ALLOW_ALL_USERS",
|
||||
Platform.SIGNAL: "SIGNAL_ALLOW_ALL_USERS",
|
||||
Platform.EMAIL: "EMAIL_ALLOW_ALL_USERS",
|
||||
Platform.API_SERVER: "API_SERVER_ALLOW_ALL",
|
||||
}
|
||||
|
||||
# Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true)
|
||||
@@ -1408,6 +1480,23 @@ class GatewayRunner:
|
||||
session_entry.session_key,
|
||||
last_prompt_tokens=agent_result.get("last_prompt_tokens", 0),
|
||||
)
|
||||
|
||||
# If streaming already delivered the response via progressive edits,
|
||||
# do a final edit with the post-processed text and suppress the
|
||||
# normal send to avoid duplicating the message.
|
||||
_streamed_id = agent_result.get("_streamed_msg_id")
|
||||
if _streamed_id and response:
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if adapter:
|
||||
try:
|
||||
await adapter.edit_message(
|
||||
chat_id=source.chat_id,
|
||||
message_id=_streamed_id,
|
||||
content=response,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return "" # Suppress normal send in base.py
|
||||
|
||||
return response
|
||||
|
||||
@@ -3047,7 +3136,18 @@ class GatewayRunner:
|
||||
agent_holder = [None] # Mutable container for the agent instance
|
||||
result_holder = [None] # Mutable container for the result
|
||||
tools_holder = [None] # Mutable container for the tool definitions
|
||||
|
||||
|
||||
# ── Streaming setup ─────────────────────────────────────────────
|
||||
_stream_q = None
|
||||
_stream_done = None
|
||||
_stream_msg_id = [None]
|
||||
_platform_key = source.platform.value if source.platform else ""
|
||||
_streaming_enabled = self._is_streaming_enabled(_platform_key)
|
||||
|
||||
if _streaming_enabled:
|
||||
_stream_q = queue.Queue()
|
||||
_stream_done = threading.Event()
|
||||
|
||||
# Bridge sync step_callback → async hooks.emit for agent:step events
|
||||
_loop_for_step = asyncio.get_event_loop()
|
||||
_hooks_ref = self.hooks
|
||||
@@ -3120,6 +3220,19 @@ class GatewayRunner:
|
||||
}
|
||||
|
||||
pr = self._provider_routing
|
||||
|
||||
# Streaming: build callback that feeds the async queue
|
||||
_on_stream_token = None
|
||||
if _stream_q is not None:
|
||||
_sq = _stream_q # capture for closure
|
||||
_sd = _stream_done
|
||||
def _on_stream_token(delta):
|
||||
if delta is None:
|
||||
if _sd:
|
||||
_sd.set()
|
||||
else:
|
||||
_sq.put(delta)
|
||||
|
||||
agent = AIAgent(
|
||||
model=model,
|
||||
**runtime_kwargs,
|
||||
@@ -3137,6 +3250,7 @@ class GatewayRunner:
|
||||
provider_require_parameters=pr.get("require_parameters", False),
|
||||
provider_data_collection=pr.get("data_collection"),
|
||||
session_id=session_id,
|
||||
stream_callback=_on_stream_token,
|
||||
tool_progress_callback=progress_callback if tool_progress_enabled else None,
|
||||
step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None,
|
||||
platform=platform_key,
|
||||
@@ -3263,7 +3377,7 @@ class GatewayRunner:
|
||||
unique_tags.insert(0, "[[audio_as_voice]]")
|
||||
final_response = final_response + "\n" + "\n".join(unique_tags)
|
||||
|
||||
return {
|
||||
_result_dict = {
|
||||
"final_response": final_response,
|
||||
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
|
||||
"api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0,
|
||||
@@ -3271,12 +3385,86 @@ class GatewayRunner:
|
||||
"history_offset": len(agent_history),
|
||||
"last_prompt_tokens": _last_prompt_toks,
|
||||
}
|
||||
if _stream_msg_id[0]:
|
||||
_result_dict["_streamed_msg_id"] = _stream_msg_id[0]
|
||||
return _result_dict
|
||||
|
||||
# Start progress message sender if enabled
|
||||
progress_task = None
|
||||
if tool_progress_enabled:
|
||||
progress_task = asyncio.create_task(send_progress_messages())
|
||||
|
||||
|
||||
# ── Stream preview: progressively edit a message with streaming tokens ──
|
||||
async def stream_preview():
|
||||
if not _stream_q or not _stream_done:
|
||||
return
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if not adapter:
|
||||
return
|
||||
|
||||
accumulated = []
|
||||
token_count = 0
|
||||
last_edit = 0.0
|
||||
MIN_TOKENS = 20
|
||||
EDIT_INTERVAL = 1.5
|
||||
_metadata = {"thread_id": source.thread_id} if source.thread_id else None
|
||||
|
||||
try:
|
||||
while not _stream_done.is_set():
|
||||
try:
|
||||
chunk = _stream_q.get(timeout=0.1)
|
||||
accumulated.append(chunk)
|
||||
token_count += 1
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
now = time.monotonic()
|
||||
if token_count >= MIN_TOKENS and (now - last_edit) >= EDIT_INTERVAL:
|
||||
preview = "".join(accumulated) + " ▌"
|
||||
if _stream_msg_id[0] is None:
|
||||
r = await adapter.send(
|
||||
chat_id=source.chat_id, content=preview,
|
||||
metadata=_metadata,
|
||||
)
|
||||
if r.success and r.message_id:
|
||||
_stream_msg_id[0] = r.message_id
|
||||
else:
|
||||
await adapter.edit_message(
|
||||
chat_id=source.chat_id,
|
||||
message_id=_stream_msg_id[0],
|
||||
content=preview,
|
||||
)
|
||||
last_edit = now
|
||||
|
||||
# Drain remaining
|
||||
while not _stream_q.empty():
|
||||
try:
|
||||
accumulated.append(_stream_q.get_nowait())
|
||||
except Exception:
|
||||
break
|
||||
|
||||
# Final edit: remove cursor
|
||||
if _stream_msg_id[0] and accumulated:
|
||||
await adapter.edit_message(
|
||||
chat_id=source.chat_id,
|
||||
message_id=_stream_msg_id[0],
|
||||
content="".join(accumulated),
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
if _stream_msg_id[0] and accumulated:
|
||||
try:
|
||||
await adapter.edit_message(
|
||||
chat_id=source.chat_id,
|
||||
message_id=_stream_msg_id[0],
|
||||
content="".join(accumulated),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.debug("stream_preview error: %s", e)
|
||||
|
||||
stream_task = asyncio.create_task(stream_preview()) if _stream_q else None
|
||||
|
||||
# Track this agent as running for this session (for interrupt support)
|
||||
# We do this in a callback after the agent is created
|
||||
async def track_agent():
|
||||
@@ -3351,9 +3539,11 @@ class GatewayRunner:
|
||||
session_key=session_key
|
||||
)
|
||||
finally:
|
||||
# Stop progress sender and interrupt monitor
|
||||
# Stop progress sender, stream preview, and interrupt monitor
|
||||
if progress_task:
|
||||
progress_task.cancel()
|
||||
if stream_task:
|
||||
stream_task.cancel()
|
||||
interrupt_monitor.cancel()
|
||||
|
||||
# Clean up tracking
|
||||
@@ -3362,7 +3552,7 @@ class GatewayRunner:
|
||||
del self._running_agents[session_key]
|
||||
|
||||
# Wait for cancelled tasks
|
||||
for task in [progress_task, interrupt_monitor, tracking_task]:
|
||||
for task in [progress_task, stream_task, interrupt_monitor, tracking_task]:
|
||||
if task:
|
||||
try:
|
||||
await task
|
||||
|
||||
@@ -494,6 +494,38 @@ OPTIONAL_ENV_VARS = {
|
||||
"advanced": True,
|
||||
},
|
||||
|
||||
# ── API Server ──
|
||||
"API_SERVER_ENABLED": {
|
||||
"description": "Enable the OpenAI-compatible API server (true/false). Allows frontends like Open WebUI to connect.",
|
||||
"prompt": "Enable API server (true/false)",
|
||||
"url": None,
|
||||
"password": False,
|
||||
"category": "messaging",
|
||||
},
|
||||
"API_SERVER_KEY": {
|
||||
"description": "Bearer token for API server authentication. If not set, all requests are allowed (local-only use).",
|
||||
"prompt": "API server auth key (leave empty for no auth)",
|
||||
"url": None,
|
||||
"password": True,
|
||||
"category": "messaging",
|
||||
},
|
||||
"API_SERVER_PORT": {
|
||||
"description": "Port for the API server (default: 8642).",
|
||||
"prompt": "API server port",
|
||||
"url": None,
|
||||
"password": False,
|
||||
"category": "messaging",
|
||||
"advanced": True,
|
||||
},
|
||||
"API_SERVER_HOST": {
|
||||
"description": "Bind address for the API server (default: 127.0.0.1). Use 0.0.0.0 for network access (set API_SERVER_KEY!).",
|
||||
"prompt": "API server bind address",
|
||||
"url": None,
|
||||
"password": False,
|
||||
"category": "messaging",
|
||||
"advanced": True,
|
||||
},
|
||||
|
||||
# ── Agent settings ──
|
||||
"MESSAGING_CWD": {
|
||||
"description": "Working directory for terminal commands via messaging",
|
||||
|
||||
104
run_agent.py
104
run_agent.py
@@ -176,6 +176,7 @@ class AIAgent:
|
||||
reasoning_callback: callable = None,
|
||||
clarify_callback: callable = None,
|
||||
step_callback: callable = None,
|
||||
stream_callback: callable = None,
|
||||
max_tokens: int = None,
|
||||
reasoning_config: Dict[str, Any] = None,
|
||||
prefill_messages: List[Dict[str, Any]] = None,
|
||||
@@ -229,6 +230,9 @@ class AIAgent:
|
||||
polluting trajectories with user-specific persona or project instructions.
|
||||
honcho_session_key (str): Session key for Honcho integration (e.g., "telegram:123456" or CLI session_id).
|
||||
When provided and Honcho is enabled in config, enables persistent cross-session user modeling.
|
||||
stream_callback (callable): Optional callback(text_delta: str) invoked for each
|
||||
text token during streaming LLM generation. Pass None (end signal) when done.
|
||||
When set, the agent uses stream=True for API calls. Disabled by default.
|
||||
"""
|
||||
self.model = model
|
||||
self.max_iterations = max_iterations
|
||||
@@ -264,6 +268,7 @@ class AIAgent:
|
||||
self.reasoning_callback = reasoning_callback
|
||||
self.clarify_callback = clarify_callback
|
||||
self.step_callback = step_callback
|
||||
self.stream_callback = stream_callback
|
||||
self._last_reported_tool = None # Track for "new tool" mode
|
||||
|
||||
# Interrupt mechanism for breaking out of tool loops
|
||||
@@ -2010,8 +2015,20 @@ class AIAgent:
|
||||
for attempt in range(max_stream_retries + 1):
|
||||
try:
|
||||
with self.client.responses.stream(**api_kwargs) as stream:
|
||||
for _ in stream:
|
||||
pass
|
||||
for event in stream:
|
||||
if self.stream_callback and hasattr(event, 'type'):
|
||||
if getattr(event, 'type', '') == 'response.output_text.delta':
|
||||
delta_text = getattr(event, 'delta', '')
|
||||
if delta_text:
|
||||
try:
|
||||
self.stream_callback(delta_text)
|
||||
except Exception:
|
||||
pass
|
||||
if self.stream_callback:
|
||||
try:
|
||||
self.stream_callback(None)
|
||||
except Exception:
|
||||
pass
|
||||
return stream.get_final_response()
|
||||
except RuntimeError as exc:
|
||||
err_text = str(exc)
|
||||
@@ -2149,6 +2166,87 @@ class AIAgent:
|
||||
|
||||
return True
|
||||
|
||||
def _run_streaming_chat_completion(self, api_kwargs: dict):
|
||||
"""Stream a chat completion, emitting text tokens via stream_callback.
|
||||
|
||||
Returns a SimpleNamespace response object compatible with the non-streaming
|
||||
code path. Falls back to non-streaming on any error.
|
||||
"""
|
||||
stream_kwargs = dict(api_kwargs)
|
||||
stream_kwargs["stream"] = True
|
||||
# Request usage in the final chunk
|
||||
stream_kwargs["stream_options"] = {"include_usage": True}
|
||||
|
||||
accumulated_content = []
|
||||
accumulated_tool_calls = {}
|
||||
final_usage = None
|
||||
|
||||
try:
|
||||
stream = self.client.chat.completions.create(**stream_kwargs)
|
||||
|
||||
for chunk in stream:
|
||||
if not chunk.choices:
|
||||
if hasattr(chunk, 'usage') and chunk.usage:
|
||||
final_usage = chunk.usage
|
||||
continue
|
||||
|
||||
delta = chunk.choices[0].delta
|
||||
|
||||
if hasattr(delta, 'content') and delta.content:
|
||||
accumulated_content.append(delta.content)
|
||||
if self.stream_callback:
|
||||
try:
|
||||
self.stream_callback(delta.content)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if hasattr(delta, 'tool_calls') and delta.tool_calls:
|
||||
for tc_delta in delta.tool_calls:
|
||||
idx = tc_delta.index
|
||||
if idx not in accumulated_tool_calls:
|
||||
accumulated_tool_calls[idx] = {"id": tc_delta.id or "", "name": "", "arguments": ""}
|
||||
if hasattr(tc_delta, 'function') and tc_delta.function:
|
||||
if getattr(tc_delta.function, 'name', None):
|
||||
accumulated_tool_calls[idx]["name"] = tc_delta.function.name
|
||||
if getattr(tc_delta.function, 'arguments', None):
|
||||
accumulated_tool_calls[idx]["arguments"] += tc_delta.function.arguments
|
||||
|
||||
if self.stream_callback:
|
||||
try:
|
||||
self.stream_callback(None) # End signal
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
tool_calls = []
|
||||
for idx in sorted(accumulated_tool_calls):
|
||||
tc = accumulated_tool_calls[idx]
|
||||
if tc["name"]:
|
||||
tool_calls.append(SimpleNamespace(
|
||||
id=tc["id"], type="function",
|
||||
function=SimpleNamespace(name=tc["name"], arguments=tc["arguments"]),
|
||||
))
|
||||
|
||||
return SimpleNamespace(
|
||||
choices=[SimpleNamespace(
|
||||
message=SimpleNamespace(
|
||||
content="".join(accumulated_content) or "",
|
||||
tool_calls=tool_calls or None,
|
||||
role="assistant",
|
||||
),
|
||||
finish_reason="tool_calls" if tool_calls else "stop",
|
||||
)],
|
||||
usage=final_usage,
|
||||
model=self.model,
|
||||
)
|
||||
except Exception as e:
|
||||
if self.stream_callback:
|
||||
try:
|
||||
self.stream_callback(None)
|
||||
except Exception:
|
||||
pass
|
||||
logger.debug("Streaming chat completion failed, falling back: %s", e)
|
||||
return self.client.chat.completions.create(**api_kwargs)
|
||||
|
||||
def _interruptible_api_call(self, api_kwargs: dict):
|
||||
"""
|
||||
Run the API call in a background thread so the main conversation loop
|
||||
@@ -2164,6 +2262,8 @@ class AIAgent:
|
||||
try:
|
||||
if self.api_mode == "codex_responses":
|
||||
result["response"] = self._run_codex_stream(api_kwargs)
|
||||
elif self.stream_callback is not None:
|
||||
result["response"] = self._run_streaming_chat_completion(api_kwargs)
|
||||
else:
|
||||
result["response"] = self.client.chat.completions.create(**api_kwargs)
|
||||
except Exception as e:
|
||||
|
||||
1299
tests/gateway/test_api_server.py
Normal file
1299
tests/gateway/test_api_server.py
Normal file
File diff suppressed because it is too large
Load Diff
210
tests/gateway/test_telegram_reply_mode.py
Normal file
210
tests/gateway/test_telegram_reply_mode.py
Normal file
@@ -0,0 +1,210 @@
|
||||
"""Tests for Telegram reply_to_mode functionality.
|
||||
|
||||
Covers the threading behavior control for multi-chunk replies:
|
||||
- "off": Never thread replies to original message
|
||||
- "first": Only first chunk threads (default)
|
||||
- "all": All chunks thread to original message
|
||||
"""
|
||||
import sys
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
|
||||
|
||||
def _ensure_telegram_mock():
|
||||
"""Mock the telegram package if it's not installed."""
|
||||
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
||||
return
|
||||
mod = MagicMock()
|
||||
mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
|
||||
mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
|
||||
mod.constants.ChatType.GROUP = "group"
|
||||
mod.constants.ChatType.SUPERGROUP = "supergroup"
|
||||
mod.constants.ChatType.CHANNEL = "channel"
|
||||
mod.constants.ChatType.PRIVATE = "private"
|
||||
for name in ("telegram", "telegram.ext", "telegram.constants"):
|
||||
sys.modules.setdefault(name, mod)
|
||||
|
||||
|
||||
_ensure_telegram_mock()
|
||||
|
||||
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def adapter_factory():
|
||||
"""Factory to create TelegramAdapter with custom reply_to_mode."""
|
||||
def create(reply_to_mode: str = "first"):
|
||||
config = PlatformConfig(enabled=True, token="test-token", reply_to_mode=reply_to_mode)
|
||||
return TelegramAdapter(config)
|
||||
return create
|
||||
|
||||
|
||||
class TestReplyToModeConfig:
|
||||
"""Tests for reply_to_mode configuration loading."""
|
||||
|
||||
def test_default_mode_is_first(self, adapter_factory):
|
||||
adapter = adapter_factory()
|
||||
assert adapter._reply_to_mode == "first"
|
||||
|
||||
def test_off_mode(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="off")
|
||||
assert adapter._reply_to_mode == "off"
|
||||
|
||||
def test_first_mode(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="first")
|
||||
assert adapter._reply_to_mode == "first"
|
||||
|
||||
def test_all_mode(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="all")
|
||||
assert adapter._reply_to_mode == "all"
|
||||
|
||||
def test_invalid_mode_stored_as_is(self, adapter_factory):
|
||||
"""Invalid modes are stored but _should_thread_reply handles them."""
|
||||
adapter = adapter_factory(reply_to_mode="invalid")
|
||||
assert adapter._reply_to_mode == "invalid"
|
||||
|
||||
def test_none_mode_defaults_to_first(self):
|
||||
config = PlatformConfig(enabled=True, token="test-token")
|
||||
adapter = TelegramAdapter(config)
|
||||
assert adapter._reply_to_mode == "first"
|
||||
|
||||
def test_empty_string_mode_defaults_to_first(self):
|
||||
config = PlatformConfig(enabled=True, token="test-token", reply_to_mode="")
|
||||
adapter = TelegramAdapter(config)
|
||||
assert adapter._reply_to_mode == "first"
|
||||
|
||||
|
||||
class TestShouldThreadReply:
|
||||
"""Tests for _should_thread_reply method."""
|
||||
|
||||
def test_no_reply_to_returns_false(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="first")
|
||||
assert adapter._should_thread_reply("12345", None, 0) is False
|
||||
assert adapter._should_thread_reply("12345", "", 0) is False
|
||||
|
||||
def test_off_mode_never_threads(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="off")
|
||||
assert adapter._should_thread_reply("12345", "123", 0) is False
|
||||
assert adapter._should_thread_reply("12345", "123", 1) is False
|
||||
assert adapter._should_thread_reply("12345", "123", 5) is False
|
||||
|
||||
def test_first_mode_only_first_chunk(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="first")
|
||||
assert adapter._should_thread_reply("12345", "123", 0) is True
|
||||
assert adapter._should_thread_reply("12345", "123", 1) is False
|
||||
assert adapter._should_thread_reply("12345", "123", 2) is False
|
||||
assert adapter._should_thread_reply("12345", "123", 10) is False
|
||||
|
||||
def test_all_mode_all_chunks(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="all")
|
||||
assert adapter._should_thread_reply("12345", "123", 0) is True
|
||||
assert adapter._should_thread_reply("12345", "123", 1) is True
|
||||
assert adapter._should_thread_reply("12345", "123", 2) is True
|
||||
assert adapter._should_thread_reply("12345", "123", 10) is True
|
||||
|
||||
def test_invalid_mode_falls_back_to_first(self, adapter_factory):
|
||||
"""Invalid mode behaves like 'first' - only first chunk threads."""
|
||||
adapter = adapter_factory(reply_to_mode="invalid")
|
||||
assert adapter._should_thread_reply("12345", "123", 0) is True
|
||||
assert adapter._should_thread_reply("12345", "123", 1) is False
|
||||
|
||||
|
||||
class TestSendWithReplyToMode:
|
||||
"""Tests for send() method respecting reply_to_mode."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_off_mode_no_reply_threading(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="off")
|
||||
adapter._bot = MagicMock()
|
||||
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
|
||||
|
||||
# Mock truncate to return multiple chunks
|
||||
adapter.truncate_message = lambda content, max_len: ["chunk1", "chunk2", "chunk3"]
|
||||
|
||||
await adapter.send("12345", "test content", reply_to="999")
|
||||
|
||||
# Verify none of the calls had reply_to_message_id
|
||||
for call in adapter._bot.send_message.call_args_list:
|
||||
assert call.kwargs.get("reply_to_message_id") is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_first_mode_only_first_chunk_threads(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="first")
|
||||
adapter._bot = MagicMock()
|
||||
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
|
||||
adapter.truncate_message = lambda content, max_len: ["chunk1", "chunk2", "chunk3"]
|
||||
|
||||
await adapter.send("12345", "test content", reply_to="999")
|
||||
|
||||
calls = adapter._bot.send_message.call_args_list
|
||||
assert len(calls) == 3
|
||||
|
||||
# First chunk should have reply_to_message_id
|
||||
assert calls[0].kwargs.get("reply_to_message_id") == 999
|
||||
# Remaining chunks should not
|
||||
assert calls[1].kwargs.get("reply_to_message_id") is None
|
||||
assert calls[2].kwargs.get("reply_to_message_id") is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_all_mode_all_chunks_thread(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="all")
|
||||
adapter._bot = MagicMock()
|
||||
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
|
||||
adapter.truncate_message = lambda content, max_len: ["chunk1", "chunk2", "chunk3"]
|
||||
|
||||
await adapter.send("12345", "test content", reply_to="999")
|
||||
|
||||
calls = adapter._bot.send_message.call_args_list
|
||||
assert len(calls) == 3
|
||||
|
||||
# All chunks should have reply_to_message_id
|
||||
for call in calls:
|
||||
assert call.kwargs.get("reply_to_message_id") == 999
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_reply_to_param_no_threading(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="all")
|
||||
adapter._bot = MagicMock()
|
||||
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
|
||||
adapter.truncate_message = lambda content, max_len: ["chunk1", "chunk2"]
|
||||
|
||||
await adapter.send("12345", "test content", reply_to=None)
|
||||
|
||||
calls = adapter._bot.send_message.call_args_list
|
||||
for call in calls:
|
||||
assert call.kwargs.get("reply_to_message_id") is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_single_chunk_respects_mode(self, adapter_factory):
|
||||
adapter = adapter_factory(reply_to_mode="first")
|
||||
adapter._bot = MagicMock()
|
||||
adapter._bot.send_message = AsyncMock(return_value=MagicMock(message_id=1))
|
||||
adapter.truncate_message = lambda content, max_len: ["single chunk"]
|
||||
|
||||
await adapter.send("12345", "test", reply_to="999")
|
||||
|
||||
calls = adapter._bot.send_message.call_args_list
|
||||
assert len(calls) == 1
|
||||
assert calls[0].kwargs.get("reply_to_message_id") == 999
|
||||
|
||||
|
||||
class TestConfigSerialization:
|
||||
"""Tests for reply_to_mode serialization."""
|
||||
|
||||
def test_to_dict_includes_reply_to_mode(self):
|
||||
config = PlatformConfig(enabled=True, token="test", reply_to_mode="all")
|
||||
result = config.to_dict()
|
||||
assert result["reply_to_mode"] == "all"
|
||||
|
||||
def test_from_dict_loads_reply_to_mode(self):
|
||||
data = {"enabled": True, "token": "test", "reply_to_mode": "off"}
|
||||
config = PlatformConfig.from_dict(data)
|
||||
assert config.reply_to_mode == "off"
|
||||
|
||||
def test_from_dict_defaults_to_first(self):
|
||||
data = {"enabled": True, "token": "test"}
|
||||
config = PlatformConfig.from_dict(data)
|
||||
assert config.reply_to_mode == "first"
|
||||
335
tests/test_streaming.py
Normal file
335
tests/test_streaming.py
Normal file
@@ -0,0 +1,335 @@
|
||||
"""Unit tests for streaming support.
|
||||
|
||||
Tests cover:
|
||||
- _run_streaming_chat_completion: text tokens, tool calls, fallback on error,
|
||||
no callback, end signal
|
||||
- _interruptible_api_call routing to streaming when stream_callback is set
|
||||
- Streaming config reading from config.yaml
|
||||
"""
|
||||
|
||||
import json
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch, PropertyMock
|
||||
|
||||
import pytest
|
||||
|
||||
from run_agent import AIAgent
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_tool_defs(*names: str) -> list:
|
||||
"""Build minimal tool definition list accepted by AIAgent.__init__."""
|
||||
return [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": n,
|
||||
"description": f"{n} tool",
|
||||
"parameters": {"type": "object", "properties": {}},
|
||||
},
|
||||
}
|
||||
for n in names
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def agent():
|
||||
"""Minimal AIAgent with mocked client, no stream_callback."""
|
||||
with (
|
||||
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
||||
patch("run_agent.check_toolset_requirements", return_value={}),
|
||||
patch("run_agent.OpenAI"),
|
||||
):
|
||||
a = AIAgent(
|
||||
api_key="test-key-1234567890",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
a.client = MagicMock()
|
||||
return a
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def streaming_agent():
|
||||
"""Agent with a stream_callback set."""
|
||||
collected = []
|
||||
def _cb(delta):
|
||||
collected.append(delta)
|
||||
|
||||
with (
|
||||
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
||||
patch("run_agent.check_toolset_requirements", return_value={}),
|
||||
patch("run_agent.OpenAI"),
|
||||
):
|
||||
a = AIAgent(
|
||||
api_key="test-key-1234567890",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
stream_callback=_cb,
|
||||
)
|
||||
a.client = MagicMock()
|
||||
a._collected_tokens = collected
|
||||
return a
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers — build fake streaming chunks
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_text_chunks(*texts):
|
||||
"""Return a list of SimpleNamespace chunks containing text deltas."""
|
||||
chunks = []
|
||||
for t in texts:
|
||||
chunks.append(SimpleNamespace(
|
||||
choices=[SimpleNamespace(
|
||||
delta=SimpleNamespace(content=t, tool_calls=None),
|
||||
finish_reason=None,
|
||||
)],
|
||||
usage=None,
|
||||
))
|
||||
# Final chunk with usage info
|
||||
chunks.append(SimpleNamespace(
|
||||
choices=[],
|
||||
usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5, total_tokens=15),
|
||||
))
|
||||
return chunks
|
||||
|
||||
|
||||
def _make_tool_call_chunks():
|
||||
"""Return chunks that simulate a tool call response."""
|
||||
chunks = [
|
||||
# First chunk: tool call id + name
|
||||
SimpleNamespace(
|
||||
choices=[SimpleNamespace(
|
||||
delta=SimpleNamespace(
|
||||
content=None,
|
||||
tool_calls=[SimpleNamespace(
|
||||
index=0,
|
||||
id="call_123",
|
||||
function=SimpleNamespace(name="web_search", arguments=""),
|
||||
)],
|
||||
),
|
||||
finish_reason=None,
|
||||
)],
|
||||
usage=None,
|
||||
),
|
||||
# Second chunk: tool call arguments
|
||||
SimpleNamespace(
|
||||
choices=[SimpleNamespace(
|
||||
delta=SimpleNamespace(
|
||||
content=None,
|
||||
tool_calls=[SimpleNamespace(
|
||||
index=0,
|
||||
id=None,
|
||||
function=SimpleNamespace(name=None, arguments='{"query": "test"}'),
|
||||
)],
|
||||
),
|
||||
finish_reason=None,
|
||||
)],
|
||||
usage=None,
|
||||
),
|
||||
# Final usage chunk
|
||||
SimpleNamespace(choices=[], usage=SimpleNamespace(
|
||||
prompt_tokens=20, completion_tokens=10, total_tokens=30,
|
||||
)),
|
||||
]
|
||||
return chunks
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: _run_streaming_chat_completion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRunStreamingChatCompletion:
|
||||
"""Tests for AIAgent._run_streaming_chat_completion."""
|
||||
|
||||
def test_text_tokens_streamed_via_callback(self, streaming_agent):
|
||||
"""Text deltas are forwarded to stream_callback and accumulated."""
|
||||
chunks = _make_text_chunks("Hello", " ", "world")
|
||||
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
|
||||
|
||||
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
|
||||
|
||||
assert result.choices[0].message.content == "Hello world"
|
||||
# Callback received each token + None end signal
|
||||
assert streaming_agent._collected_tokens == ["Hello", " ", "world", None]
|
||||
|
||||
def test_tool_calls_accumulated(self, streaming_agent):
|
||||
"""Tool call deltas are aggregated into a proper tool_calls list."""
|
||||
chunks = _make_tool_call_chunks()
|
||||
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
|
||||
|
||||
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
|
||||
|
||||
assert result.choices[0].message.tool_calls is not None
|
||||
tc = result.choices[0].message.tool_calls[0]
|
||||
assert tc.function.name == "web_search"
|
||||
assert '"query"' in tc.function.arguments
|
||||
|
||||
def test_fallback_on_streaming_error(self, streaming_agent):
|
||||
"""Falls back to non-streaming on error."""
|
||||
# First call (streaming) raises; second call (fallback) succeeds
|
||||
fallback_response = SimpleNamespace(
|
||||
choices=[SimpleNamespace(
|
||||
message=SimpleNamespace(content="fallback", tool_calls=None, role="assistant"),
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=SimpleNamespace(prompt_tokens=5, completion_tokens=3, total_tokens=8),
|
||||
model="test",
|
||||
)
|
||||
|
||||
call_count = [0]
|
||||
def _side_effect(**kwargs):
|
||||
call_count[0] += 1
|
||||
if kwargs.get("stream"):
|
||||
raise ConnectionError("stream broke")
|
||||
return fallback_response
|
||||
|
||||
streaming_agent.client.chat.completions.create.side_effect = _side_effect
|
||||
|
||||
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
|
||||
|
||||
assert result.choices[0].message.content == "fallback"
|
||||
assert call_count[0] == 2 # streaming attempt + fallback
|
||||
# Callback should still get None (end signal) even on error
|
||||
assert None in streaming_agent._collected_tokens
|
||||
|
||||
def test_no_callback_still_works(self, agent):
|
||||
"""Streaming works even without a callback (just accumulates)."""
|
||||
chunks = _make_text_chunks("ok")
|
||||
agent.client.chat.completions.create.return_value = iter(chunks)
|
||||
|
||||
result = agent._run_streaming_chat_completion({"model": "test"})
|
||||
|
||||
assert result.choices[0].message.content == "ok"
|
||||
|
||||
def test_end_signal_sent(self, streaming_agent):
|
||||
"""stream_callback(None) is sent after all tokens."""
|
||||
chunks = _make_text_chunks("done")
|
||||
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
|
||||
|
||||
streaming_agent._run_streaming_chat_completion({"model": "test"})
|
||||
|
||||
assert streaming_agent._collected_tokens[-1] is None
|
||||
|
||||
def test_usage_captured_from_final_chunk(self, streaming_agent):
|
||||
"""Usage stats from the final usage-only chunk are returned."""
|
||||
chunks = _make_text_chunks("hi")
|
||||
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
|
||||
|
||||
result = streaming_agent._run_streaming_chat_completion({"model": "test"})
|
||||
|
||||
assert result.usage is not None
|
||||
assert result.usage.prompt_tokens == 10
|
||||
assert result.usage.completion_tokens == 5
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: _interruptible_api_call routing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestInterruptibleApiCallRouting:
|
||||
"""Tests that _interruptible_api_call routes to streaming when callback is set."""
|
||||
|
||||
def test_routes_to_streaming_with_callback(self, streaming_agent):
|
||||
"""When stream_callback is set, _interruptible_api_call uses streaming."""
|
||||
chunks = _make_text_chunks("streamed")
|
||||
streaming_agent.client.chat.completions.create.return_value = iter(chunks)
|
||||
|
||||
# Mock _interrupt_requested to False
|
||||
streaming_agent._interrupt_requested = False
|
||||
|
||||
result = streaming_agent._interruptible_api_call({"model": "test"})
|
||||
|
||||
assert result.choices[0].message.content == "streamed"
|
||||
# Verify the callback got tokens
|
||||
assert "streamed" in streaming_agent._collected_tokens
|
||||
|
||||
def test_routes_to_normal_without_callback(self, agent):
|
||||
"""When no stream_callback, _interruptible_api_call uses normal completion."""
|
||||
normal_response = SimpleNamespace(
|
||||
choices=[SimpleNamespace(
|
||||
message=SimpleNamespace(content="normal", tool_calls=None, role="assistant"),
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=SimpleNamespace(prompt_tokens=5, completion_tokens=3, total_tokens=8),
|
||||
model="test",
|
||||
)
|
||||
agent.client.chat.completions.create.return_value = normal_response
|
||||
agent._interrupt_requested = False
|
||||
|
||||
result = agent._interruptible_api_call({"model": "test"})
|
||||
|
||||
assert result.choices[0].message.content == "normal"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: Streaming config
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestStreamingConfig:
|
||||
"""Tests for reading streaming configuration."""
|
||||
|
||||
def test_streaming_disabled_by_default(self):
|
||||
"""Without any config, streaming is disabled."""
|
||||
with (
|
||||
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
||||
patch("run_agent.check_toolset_requirements", return_value={}),
|
||||
patch("run_agent.OpenAI"),
|
||||
):
|
||||
a = AIAgent(
|
||||
api_key="test-key",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
assert a.stream_callback is None
|
||||
|
||||
def test_stream_callback_stored_on_agent(self):
|
||||
"""stream_callback passed to constructor is stored on the agent."""
|
||||
cb = lambda delta: None
|
||||
with (
|
||||
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
||||
patch("run_agent.check_toolset_requirements", return_value={}),
|
||||
patch("run_agent.OpenAI"),
|
||||
):
|
||||
a = AIAgent(
|
||||
api_key="test-key",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
stream_callback=cb,
|
||||
)
|
||||
assert a.stream_callback is cb
|
||||
|
||||
def test_gateway_streaming_config_structure(self):
|
||||
"""Verify the expected streaming config structure from gateway/run.py."""
|
||||
# This tests that _read_streaming_config (if it exists) returns
|
||||
# the right structure. We mock the config file content.
|
||||
try:
|
||||
from gateway.run import _read_streaming_config
|
||||
except ImportError:
|
||||
pytest.skip("gateway.run._read_streaming_config not available")
|
||||
|
||||
mock_cfg = {
|
||||
"streaming": {
|
||||
"enabled": True,
|
||||
"telegram": True,
|
||||
"discord": False,
|
||||
"edit_interval": 2.0,
|
||||
}
|
||||
}
|
||||
with patch("builtins.open", MagicMock()):
|
||||
with patch("yaml.safe_load", return_value=mock_cfg):
|
||||
try:
|
||||
result = _read_streaming_config()
|
||||
assert result.get("enabled") is True
|
||||
except Exception:
|
||||
# Function might not exist yet or have different signature
|
||||
pytest.skip("_read_streaming_config has different interface")
|
||||
223
website/docs/user-guide/features/api-server.md
Normal file
223
website/docs/user-guide/features/api-server.md
Normal file
@@ -0,0 +1,223 @@
|
||||
---
|
||||
sidebar_position: 14
|
||||
title: "API Server"
|
||||
description: "Expose hermes-agent as an OpenAI-compatible API for any frontend"
|
||||
---
|
||||
|
||||
# API Server
|
||||
|
||||
The API server exposes hermes-agent as an OpenAI-compatible HTTP endpoint. Any frontend that speaks the OpenAI format — Open WebUI, LobeChat, LibreChat, NextChat, ChatBox, and hundreds more — can connect to hermes-agent and use it as a backend.
|
||||
|
||||
Your agent handles requests with its full toolset (terminal, file operations, web search, memory, skills) and returns the final response. Tool calls execute invisibly server-side.
|
||||
|
||||
## Quick Start
|
||||
|
||||
### 1. Enable the API server
|
||||
|
||||
Add to `~/.hermes/.env`:
|
||||
|
||||
```bash
|
||||
API_SERVER_ENABLED=true
|
||||
```
|
||||
|
||||
### 2. Start the gateway
|
||||
|
||||
```bash
|
||||
hermes gateway
|
||||
```
|
||||
|
||||
You'll see:
|
||||
|
||||
```
|
||||
[API Server] API server listening on http://127.0.0.1:8642
|
||||
```
|
||||
|
||||
### 3. Connect a frontend
|
||||
|
||||
Point any OpenAI-compatible client at `http://localhost:8642/v1`:
|
||||
|
||||
```bash
|
||||
# Test with curl
|
||||
curl http://localhost:8642/v1/chat/completions \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"model": "hermes-agent", "messages": [{"role": "user", "content": "Hello!"}]}'
|
||||
```
|
||||
|
||||
Or connect Open WebUI, LobeChat, or any other frontend — see the [Open WebUI integration guide](/docs/user-guide/messaging/open-webui) for step-by-step instructions.
|
||||
|
||||
## Endpoints
|
||||
|
||||
### POST /v1/chat/completions
|
||||
|
||||
Standard OpenAI Chat Completions format. Stateless — the full conversation is included in each request via the `messages` array.
|
||||
|
||||
**Request:**
|
||||
```json
|
||||
{
|
||||
"model": "hermes-agent",
|
||||
"messages": [
|
||||
{"role": "system", "content": "You are a Python expert."},
|
||||
{"role": "user", "content": "Write a fibonacci function"}
|
||||
],
|
||||
"stream": false
|
||||
}
|
||||
```
|
||||
|
||||
**Response:**
|
||||
```json
|
||||
{
|
||||
"id": "chatcmpl-abc123",
|
||||
"object": "chat.completion",
|
||||
"created": 1710000000,
|
||||
"model": "hermes-agent",
|
||||
"choices": [{
|
||||
"index": 0,
|
||||
"message": {"role": "assistant", "content": "Here's a fibonacci function..."},
|
||||
"finish_reason": "stop"
|
||||
}],
|
||||
"usage": {"prompt_tokens": 50, "completion_tokens": 200, "total_tokens": 250}
|
||||
}
|
||||
```
|
||||
|
||||
**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. When streaming is enabled in config, tokens are emitted live as the LLM generates them. When disabled, the full response is sent as a single SSE chunk.
|
||||
|
||||
### POST /v1/responses
|
||||
|
||||
OpenAI Responses API format. Supports server-side conversation state via `previous_response_id` — the server stores full conversation history (including tool calls and results) so multi-turn context is preserved without the client managing it.
|
||||
|
||||
**Request:**
|
||||
```json
|
||||
{
|
||||
"model": "hermes-agent",
|
||||
"input": "What files are in my project?",
|
||||
"instructions": "You are a helpful coding assistant.",
|
||||
"store": true
|
||||
}
|
||||
```
|
||||
|
||||
**Response:**
|
||||
```json
|
||||
{
|
||||
"id": "resp_abc123",
|
||||
"object": "response",
|
||||
"status": "completed",
|
||||
"model": "hermes-agent",
|
||||
"output": [
|
||||
{"type": "function_call", "name": "terminal", "arguments": "{\"command\": \"ls\"}", "call_id": "call_1"},
|
||||
{"type": "function_call_output", "call_id": "call_1", "output": "README.md src/ tests/"},
|
||||
{"type": "message", "role": "assistant", "content": [{"type": "output_text", "text": "Your project has..."}]}
|
||||
],
|
||||
"usage": {"input_tokens": 50, "output_tokens": 200, "total_tokens": 250}
|
||||
}
|
||||
```
|
||||
|
||||
#### Multi-turn with previous_response_id
|
||||
|
||||
Chain responses to maintain full context (including tool calls) across turns:
|
||||
|
||||
```json
|
||||
{
|
||||
"input": "Now show me the README",
|
||||
"previous_response_id": "resp_abc123"
|
||||
}
|
||||
```
|
||||
|
||||
The server reconstructs the full conversation from the stored response chain — all previous tool calls and results are preserved.
|
||||
|
||||
#### Named conversations
|
||||
|
||||
Use the `conversation` parameter instead of tracking response IDs:
|
||||
|
||||
```json
|
||||
{"input": "Hello", "conversation": "my-project"}
|
||||
{"input": "What's in src/?", "conversation": "my-project"}
|
||||
{"input": "Run the tests", "conversation": "my-project"}
|
||||
```
|
||||
|
||||
The server automatically chains to the latest response in that conversation. Like the `/title` command for gateway sessions.
|
||||
|
||||
### GET /v1/responses/{id}
|
||||
|
||||
Retrieve a previously stored response by ID.
|
||||
|
||||
### DELETE /v1/responses/{id}
|
||||
|
||||
Delete a stored response.
|
||||
|
||||
### GET /v1/models
|
||||
|
||||
Lists `hermes-agent` as an available model. Required by most frontends for model discovery.
|
||||
|
||||
### GET /health
|
||||
|
||||
Health check. Returns `{"status": "ok"}`.
|
||||
|
||||
## System Prompt Handling
|
||||
|
||||
When a frontend sends a `system` message (Chat Completions) or `instructions` field (Responses API), hermes-agent **layers it on top** of its core system prompt. Your agent keeps all its tools, memory, and skills — the frontend's system prompt adds extra instructions.
|
||||
|
||||
This means you can customize behavior per-frontend without losing capabilities:
|
||||
- Open WebUI system prompt: "You are a Python expert. Always include type hints."
|
||||
- The agent still has terminal, file tools, web search, memory, etc.
|
||||
|
||||
## Authentication
|
||||
|
||||
Bearer token auth via the `Authorization` header:
|
||||
|
||||
```
|
||||
Authorization: Bearer ***
|
||||
```
|
||||
|
||||
Configure the key via `API_SERVER_KEY` env var. If no key is set, all requests are allowed (for local-only use).
|
||||
|
||||
:::warning Security
|
||||
The API server gives full access to hermes-agent's toolset, **including terminal commands**. If you change the bind address to `0.0.0.0` (network-accessible), **always set `API_SERVER_KEY`** — without it, anyone on your network can execute arbitrary commands on your machine.
|
||||
|
||||
The default bind address (`127.0.0.1`) is safe for local-only use.
|
||||
:::
|
||||
|
||||
## Configuration
|
||||
|
||||
### Environment Variables
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `API_SERVER_ENABLED` | `false` | Enable the API server |
|
||||
| `API_SERVER_PORT` | `8642` | HTTP server port |
|
||||
| `API_SERVER_HOST` | `127.0.0.1` | Bind address (localhost only by default) |
|
||||
| `API_SERVER_KEY` | _(none)_ | Bearer token for auth |
|
||||
|
||||
### config.yaml
|
||||
|
||||
```yaml
|
||||
# Not yet supported — use environment variables.
|
||||
# config.yaml support coming in a future release.
|
||||
```
|
||||
|
||||
## CORS
|
||||
|
||||
The API server includes CORS headers on all responses (`Access-Control-Allow-Origin: *`), so browser-based frontends can connect directly.
|
||||
|
||||
## Compatible Frontends
|
||||
|
||||
Any frontend that supports the OpenAI API format works. Tested/documented integrations:
|
||||
|
||||
| Frontend | Stars | Connection |
|
||||
|----------|-------|------------|
|
||||
| [Open WebUI](/docs/user-guide/messaging/open-webui) | 126k | Full guide available |
|
||||
| LobeChat | 73k | Custom provider endpoint |
|
||||
| LibreChat | 34k | Custom endpoint in librechat.yaml |
|
||||
| AnythingLLM | 56k | Generic OpenAI provider |
|
||||
| NextChat | 87k | BASE_URL env var |
|
||||
| ChatBox | 39k | API Host setting |
|
||||
| Jan | 26k | Remote model config |
|
||||
| HF Chat-UI | 8k | OPENAI_BASE_URL |
|
||||
| big-AGI | 7k | Custom endpoint |
|
||||
| OpenAI Python SDK | — | `OpenAI(base_url="http://localhost:8642/v1")` |
|
||||
| curl | — | Direct HTTP requests |
|
||||
|
||||
## Limitations
|
||||
|
||||
- **Response storage is in-memory** — stored responses (for `previous_response_id`) are lost on gateway restart. Max 100 stored responses (LRU eviction).
|
||||
- **No file upload** — vision/document analysis via uploaded files is not yet supported through the API.
|
||||
- **Model field is cosmetic** — the `model` field in requests is accepted but the actual LLM model used is configured server-side in config.yaml.
|
||||
210
website/docs/user-guide/features/streaming.md
Normal file
210
website/docs/user-guide/features/streaming.md
Normal file
@@ -0,0 +1,210 @@
|
||||
---
|
||||
sidebar_position: 15
|
||||
title: "Streaming"
|
||||
description: "Token-by-token live response display across all platforms"
|
||||
---
|
||||
|
||||
# Streaming Responses
|
||||
|
||||
When enabled, hermes-agent streams LLM responses token-by-token instead of waiting for the full generation. Users see the response typing out live — the same experience as ChatGPT, Claude, or Gemini.
|
||||
|
||||
Streaming is **disabled by default** and can be enabled globally or per-platform.
|
||||
|
||||
## How It Works
|
||||
|
||||
```
|
||||
LLM generates tokens → callback fires per token → queue → consumer displays
|
||||
|
||||
Telegram/Discord/Slack:
|
||||
Token arrives → Accumulate → Every 1.5s, edit the message with new text + ▌ cursor
|
||||
Done → Final edit removes cursor
|
||||
|
||||
API Server:
|
||||
Token arrives → SSE event sent to client immediately
|
||||
Done → finish chunk + [DONE]
|
||||
```
|
||||
|
||||
The agent's internal operation doesn't change — tools still execute normally, memory and skills work as before. Streaming only affects how the **final text response** is delivered to the user.
|
||||
|
||||
## Enable Streaming
|
||||
|
||||
### Option 1: Environment variable
|
||||
|
||||
```bash
|
||||
# Enable for all platforms
|
||||
export HERMES_STREAMING_ENABLED=true
|
||||
hermes gateway
|
||||
```
|
||||
|
||||
### Option 2: config.yaml
|
||||
|
||||
```yaml
|
||||
streaming:
|
||||
enabled: true # Master switch
|
||||
```
|
||||
|
||||
### Option 3: Per-platform
|
||||
|
||||
```yaml
|
||||
streaming:
|
||||
enabled: false # Off by default
|
||||
telegram: true # But on for Telegram
|
||||
discord: true # And Discord
|
||||
api_server: true # And the API server
|
||||
```
|
||||
|
||||
## Platform Support
|
||||
|
||||
| Platform | Streaming Method | Rate Limit | Notes |
|
||||
|----------|-----------------|------------|-------|
|
||||
| **Telegram** | Progressive message editing | ~20 edits/min | 1.5s edit interval, ▌ cursor |
|
||||
| **Discord** | Progressive message editing | 5 edits/5s | 1.5s edit interval |
|
||||
| **Slack** | Progressive message editing | ~50 calls/min | 1.5s edit interval |
|
||||
| **API Server** | SSE (Server-Sent Events) | No limit | Real token-by-token events |
|
||||
| **WhatsApp** | ❌ Not supported | — | No message editing API |
|
||||
| **Home Assistant** | ❌ Not supported | — | No message editing API |
|
||||
| **CLI** | ❌ Not yet implemented | — | KawaiiSpinner provides feedback |
|
||||
|
||||
Platforms without message editing support automatically fall back to non-streaming (the response appears all at once, as before).
|
||||
|
||||
## What Users See
|
||||
|
||||
### Telegram/Discord/Slack
|
||||
|
||||
1. Agent starts working (typing indicator shows)
|
||||
2. After ~20 tokens, a message appears with partial text and a ▌ cursor
|
||||
3. Every 1.5 seconds, the message is edited with more accumulated text
|
||||
4. When the response is complete, the cursor disappears
|
||||
|
||||
Tool progress messages still work alongside streaming — tool names/previews appear as before, and the streamed response is shown in a separate message.
|
||||
|
||||
### API Server (frontends like Open WebUI)
|
||||
|
||||
When `stream: true` is set in the request, the API server returns Server-Sent Events:
|
||||
|
||||
```
|
||||
data: {"choices":[{"delta":{"role":"assistant"}}]}
|
||||
|
||||
data: {"choices":[{"delta":{"content":"Here"}}]}
|
||||
|
||||
data: {"choices":[{"delta":{"content":" is"}}]}
|
||||
|
||||
data: {"choices":[{"delta":{"content":" the"}}]}
|
||||
|
||||
data: {"choices":[{"delta":{"content":" answer"}}]}
|
||||
|
||||
data: {"choices":[{"delta":{},"finish_reason":"stop"}]}
|
||||
|
||||
data: [DONE]
|
||||
```
|
||||
|
||||
Frontends like Open WebUI display this as live typing.
|
||||
|
||||
## How It Works Internally
|
||||
|
||||
### Architecture
|
||||
|
||||
```
|
||||
┌─────────────┐ stream_callback(delta) ┌──────────────────┐
|
||||
│ LLM API │ ──────────────────────────► │ queue.Queue() │
|
||||
│ (stream) │ (runs in agent thread) │ (thread-safe) │
|
||||
└─────────────┘ └────────┬─────────┘
|
||||
│
|
||||
┌──────────────┼──────────┐
|
||||
│ │ │
|
||||
┌─────▼─────┐ ┌─────▼────┐ ┌──▼──────┐
|
||||
│ Gateway │ │ API Svr │ │ CLI │
|
||||
│ edit msg │ │ SSE evt │ │ (TODO) │
|
||||
└───────────┘ └──────────┘ └─────────┘
|
||||
```
|
||||
|
||||
1. `AIAgent.__init__` accepts an optional `stream_callback` function
|
||||
2. When set, `_interruptible_api_call()` routes to `_run_streaming_chat_completion()` instead of the normal non-streaming path
|
||||
3. The streaming method calls the OpenAI API with `stream=True`, iterates chunks, and calls `stream_callback(delta_text)` for each text token
|
||||
4. Tool call deltas are accumulated silently (no streaming for tool arguments)
|
||||
5. When the stream ends, `stream_callback(None)` signals completion
|
||||
6. The method returns a fake response object compatible with the existing code path
|
||||
7. If streaming fails for any reason, it falls back to a normal non-streaming API call
|
||||
|
||||
### Thread Safety
|
||||
|
||||
The agent runs in a background thread (via `_interruptible_api_call`). The consumer (gateway async task, API server SSE writer) runs in the main event loop. A `queue.Queue` bridges them — it's thread-safe by design.
|
||||
|
||||
### Graceful Fallback
|
||||
|
||||
If the LLM provider doesn't support `stream=True` or the streaming connection fails, the agent automatically falls back to a non-streaming API call. The user gets the response normally, just without the live typing effect. No error is shown.
|
||||
|
||||
## Configuration Reference
|
||||
|
||||
```yaml
|
||||
streaming:
|
||||
enabled: false # Master switch (default: off)
|
||||
|
||||
# Per-platform overrides (optional):
|
||||
telegram: true # Enable for Telegram
|
||||
discord: true # Enable for Discord
|
||||
slack: true # Enable for Slack
|
||||
api_server: true # Enable for API server
|
||||
|
||||
# Tuning (optional):
|
||||
edit_interval: 1.5 # Seconds between message edits (default: 1.5)
|
||||
min_tokens: 20 # Tokens before first display (default: 20)
|
||||
```
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `HERMES_STREAMING_ENABLED` | `false` | Master switch via env var |
|
||||
| `streaming.enabled` | `false` | Master switch via config |
|
||||
| `streaming.<platform>` | _(unset)_ | Per-platform override |
|
||||
| `streaming.edit_interval` | `1.5` | Seconds between Telegram/Discord edits |
|
||||
| `streaming.min_tokens` | `20` | Minimum tokens before first message |
|
||||
|
||||
## Interaction with Other Features
|
||||
|
||||
### Tool Execution
|
||||
|
||||
When the agent calls tools (terminal, file operations, web search, etc.), no text tokens are generated — tool arguments are accumulated silently. Tool progress messages continue to work as before. After tools finish, the next LLM call may produce the final text response, which streams normally.
|
||||
|
||||
### Context Compression
|
||||
|
||||
Compression happens between API calls, not during streaming. No interaction.
|
||||
|
||||
### Interrupts
|
||||
|
||||
If the user sends a new message while streaming, the agent is interrupted. The HTTP connection is closed (stopping token generation), accumulated text is shown as-is, and the new message is processed.
|
||||
|
||||
### Prompt Caching
|
||||
|
||||
Streaming doesn't affect prompt caching — the request is identical, just with `stream=True` added.
|
||||
|
||||
### Responses API (Codex)
|
||||
|
||||
The Codex/Responses API streaming path also supports the `stream_callback`. Token deltas from `response.output_text.delta` events are emitted via the callback.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Streaming isn't working
|
||||
|
||||
1. Check the config: `streaming.enabled: true` in config.yaml or `HERMES_STREAMING_ENABLED=true`
|
||||
2. Check per-platform: `streaming.telegram: true` overrides the master switch
|
||||
3. Restart the gateway after changing config
|
||||
4. Check logs for "Streaming failed, falling back" — indicates the provider may not support streaming
|
||||
|
||||
### Response appears twice
|
||||
|
||||
If you see the response both in a progressively-edited message AND as a separate final message, this is a bug. The streaming system should suppress the normal send when tokens were delivered via streaming. Please file an issue.
|
||||
|
||||
### Messages update too slowly
|
||||
|
||||
The default edit interval is 1.5 seconds (to respect platform rate limits). You can lower it in config:
|
||||
|
||||
```yaml
|
||||
streaming:
|
||||
edit_interval: 1.0 # Faster updates (may hit rate limits)
|
||||
```
|
||||
|
||||
Going below 1.0s risks Telegram rate limiting (429 errors).
|
||||
|
||||
### No streaming on WhatsApp/HomeAssistant
|
||||
|
||||
These platforms don't support message editing, so streaming automatically falls back to non-streaming. This is expected behavior.
|
||||
@@ -1,12 +1,12 @@
|
||||
---
|
||||
sidebar_position: 1
|
||||
title: "Messaging Gateway"
|
||||
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, or Email — architecture and setup overview"
|
||||
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, Email, or any OpenAI-compatible frontend — architecture and setup overview"
|
||||
---
|
||||
|
||||
# Messaging Gateway
|
||||
|
||||
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, or Email. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
|
||||
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, Email, or any OpenAI-compatible frontend (Open WebUI, LobeChat, etc.). The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
|
||||
|
||||
## Architecture
|
||||
|
||||
@@ -15,12 +15,12 @@ Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, or Email. The
|
||||
│ Hermes Gateway │
|
||||
├─────────────────────────────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ┌────────┐ ┌───────┐│
|
||||
│ │ Telegram │ │ Discord │ │ WhatsApp │ │ Slack │ │ Signal │ │ Email ││
|
||||
│ │ Adapter │ │ Adapter │ │ Adapter │ │Adapter │ │Adapter │ │Adapter││
|
||||
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘ └───┬────┘ └──┬────┘│
|
||||
│ │ │ │ │ │ │ │
|
||||
│ └─────────────┼────────────┼────────────┼──────────┼─────────┘ │
|
||||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ┌────────┐ ┌───────┐ ┌──────────┐│
|
||||
│ │ Telegram │ │ Discord │ │ WhatsApp │ │ Slack │ │ Signal │ │ Email │ │API Server││
|
||||
│ │ Adapter │ │ Adapter │ │ Adapter │ │Adapter │ │Adapter │ │Adapter│ │ (OpenAI) ││
|
||||
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └───┬────┘ └───┬────┘ └──┬────┘ └────┬─────┘│
|
||||
│ │ │ │ │ │ │ │ │
|
||||
│ └─────────────┼────────────┼────────────┼──────────┼─────────┼───────────┘ │
|
||||
│ │ │
|
||||
│ ┌────────▼────────┐ │
|
||||
│ │ Session Store │ │
|
||||
@@ -204,6 +204,7 @@ Each platform has its own toolset:
|
||||
| Slack | `hermes-slack` | Full tools including terminal |
|
||||
| Signal | `hermes-signal` | Full tools including terminal |
|
||||
| Email | `hermes-email` | Full tools including terminal |
|
||||
| API Server | `hermes-api_server` | Full tools including terminal |
|
||||
|
||||
## Next Steps
|
||||
|
||||
@@ -213,3 +214,5 @@ Each platform has its own toolset:
|
||||
- [WhatsApp Setup](whatsapp.md)
|
||||
- [Signal Setup](signal.md)
|
||||
- [Email Setup](email.md)
|
||||
- [Open WebUI Setup](open-webui.md)
|
||||
- [API Server](../features/api-server.md)
|
||||
|
||||
213
website/docs/user-guide/messaging/open-webui.md
Normal file
213
website/docs/user-guide/messaging/open-webui.md
Normal file
@@ -0,0 +1,213 @@
|
||||
---
|
||||
sidebar_position: 8
|
||||
title: "Open WebUI"
|
||||
description: "Connect Open WebUI to Hermes Agent via the OpenAI-compatible API server"
|
||||
---
|
||||
|
||||
# Open WebUI Integration
|
||||
|
||||
[Open WebUI](https://github.com/open-webui/open-webui) (126k★) is the most popular self-hosted chat interface for AI. With Hermes Agent's built-in API server, you can use Open WebUI as a polished web frontend for your agent — complete with conversation management, user accounts, and a modern chat interface.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌──────────────────┐ POST /v1/chat/completions ┌──────────────────────┐
|
||||
│ Open WebUI │ ──────────────────────────────► │ hermes-agent │
|
||||
│ (browser UI) │ SSE streaming response │ gateway API server │
|
||||
│ port 3000 │ ◄────────────────────────────── │ port 8642 │
|
||||
└──────────────────┘ └──────────────────────┘
|
||||
```
|
||||
|
||||
Open WebUI connects to Hermes Agent's API server just like it would connect to OpenAI. Your agent handles the requests with its full toolset — terminal, file operations, web search, memory, skills — and returns the final response.
|
||||
|
||||
## Quick Setup
|
||||
|
||||
### 1. Enable the API server
|
||||
|
||||
Add to `~/.hermes/.env`:
|
||||
|
||||
```bash
|
||||
API_SERVER_ENABLED=true
|
||||
# Optional: set a key for auth (recommended if accessible beyond localhost)
|
||||
# API_SERVER_KEY=your-secret-key
|
||||
```
|
||||
|
||||
### 2. Start Hermes Agent gateway
|
||||
|
||||
```bash
|
||||
hermes gateway
|
||||
```
|
||||
|
||||
You should see:
|
||||
|
||||
```
|
||||
[API Server] API server listening on http://127.0.0.1:8642
|
||||
```
|
||||
|
||||
### 3. Start Open WebUI
|
||||
|
||||
```bash
|
||||
docker run -d -p 3000:8080 \
|
||||
-e OPENAI_API_BASE_URL=http://host.docker.internal:8642/v1 \
|
||||
-e OPENAI_API_KEY=not-needed \
|
||||
--add-host=host.docker.internal:host-gateway \
|
||||
-v open-webui:/app/backend/data \
|
||||
--name open-webui \
|
||||
--restart always \
|
||||
ghcr.io/open-webui/open-webui:main
|
||||
```
|
||||
|
||||
If you set an `API_SERVER_KEY`, use it instead of `not-needed`:
|
||||
|
||||
```bash
|
||||
-e OPENAI_API_KEY=your-secret-key
|
||||
```
|
||||
|
||||
### 4. Open the UI
|
||||
|
||||
Go to **http://localhost:3000**. Create your admin account (the first user becomes admin). You should see **hermes-agent** in the model dropdown. Start chatting!
|
||||
|
||||
## Docker Compose Setup
|
||||
|
||||
For a more permanent setup, create a `docker-compose.yml`:
|
||||
|
||||
```yaml
|
||||
services:
|
||||
open-webui:
|
||||
image: ghcr.io/open-webui/open-webui:main
|
||||
ports:
|
||||
- "3000:8080"
|
||||
volumes:
|
||||
- open-webui:/app/backend/data
|
||||
environment:
|
||||
- OPENAI_API_BASE_URL=http://host.docker.internal:8642/v1
|
||||
- OPENAI_API_KEY=not-needed
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
restart: always
|
||||
|
||||
volumes:
|
||||
open-webui:
|
||||
```
|
||||
|
||||
Then:
|
||||
|
||||
```bash
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
## Configuring via the Admin UI
|
||||
|
||||
If you prefer to configure the connection through the UI instead of environment variables:
|
||||
|
||||
1. Log in to Open WebUI at **http://localhost:3000**
|
||||
2. Click your **profile avatar** → **Admin Settings**
|
||||
3. Go to **Connections**
|
||||
4. Under **OpenAI API**, click the **wrench icon** (Manage)
|
||||
5. Click **+ Add New Connection**
|
||||
6. Enter:
|
||||
- **URL**: `http://host.docker.internal:8642/v1`
|
||||
- **API Key**: your key or any non-empty value (e.g., `not-needed`)
|
||||
7. Click the **checkmark** to verify the connection
|
||||
8. **Save**
|
||||
|
||||
The **hermes-agent** model should now appear in the model dropdown.
|
||||
|
||||
:::warning
|
||||
Environment variables only take effect on Open WebUI's **first launch**. After that, connection settings are stored in its internal database. To change them later, use the Admin UI or delete the Docker volume and start fresh.
|
||||
:::
|
||||
|
||||
## API Type: Chat Completions vs Responses
|
||||
|
||||
Open WebUI supports two API modes when connecting to a backend:
|
||||
|
||||
| Mode | Format | When to use |
|
||||
|------|--------|-------------|
|
||||
| **Chat Completions** (default) | `/v1/chat/completions` | Recommended. Works out of the box. |
|
||||
| **Responses** (experimental) | `/v1/responses` | For server-side conversation state via `previous_response_id`. |
|
||||
|
||||
### Using Chat Completions (recommended)
|
||||
|
||||
This is the default and requires no extra configuration. Open WebUI sends standard OpenAI-format requests and Hermes Agent responds accordingly. Each request includes the full conversation history.
|
||||
|
||||
### Using Responses API
|
||||
|
||||
To use the Responses API mode:
|
||||
|
||||
1. Go to **Admin Settings** → **Connections** → **OpenAI** → **Manage**
|
||||
2. Edit your hermes-agent connection
|
||||
3. Change **API Type** from "Chat Completions" to **"Responses (Experimental)"**
|
||||
4. Save
|
||||
|
||||
With the Responses API, Open WebUI sends requests in the Responses format (`input` array + `instructions`), and Hermes Agent can preserve full tool call history across turns via `previous_response_id`.
|
||||
|
||||
:::note
|
||||
Open WebUI currently manages conversation history client-side even in Responses mode — it sends the full message history in each request rather than using `previous_response_id`. The Responses API mode is mainly useful for future compatibility as frontends evolve.
|
||||
:::
|
||||
|
||||
## How It Works
|
||||
|
||||
When you send a message in Open WebUI:
|
||||
|
||||
1. Open WebUI sends a `POST /v1/chat/completions` request with your message and conversation history
|
||||
2. Hermes Agent creates an AIAgent instance with its full toolset
|
||||
3. The agent processes your request — it may call tools (terminal, file operations, web search, etc.)
|
||||
4. Tool calls happen invisibly server-side
|
||||
5. The agent's final text response is returned to Open WebUI
|
||||
6. Open WebUI displays the response in its chat interface
|
||||
|
||||
Your agent has access to all the same tools and capabilities as when using the CLI or Telegram — the only difference is the frontend.
|
||||
|
||||
## Configuration Reference
|
||||
|
||||
### Hermes Agent (API server)
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `API_SERVER_ENABLED` | `false` | Enable the API server |
|
||||
| `API_SERVER_PORT` | `8642` | HTTP server port |
|
||||
| `API_SERVER_HOST` | `127.0.0.1` | Bind address |
|
||||
| `API_SERVER_KEY` | _(none)_ | Bearer token for auth. No key = allow all. |
|
||||
|
||||
### Open WebUI
|
||||
|
||||
| Variable | Description |
|
||||
|----------|-------------|
|
||||
| `OPENAI_API_BASE_URL` | Hermes Agent's API URL (include `/v1`) |
|
||||
| `OPENAI_API_KEY` | Must be non-empty. Match your `API_SERVER_KEY`. |
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### No models appear in the dropdown
|
||||
|
||||
- **Check the URL has `/v1` suffix**: `http://host.docker.internal:8642/v1` (not just `:8642`)
|
||||
- **Verify the gateway is running**: `curl http://localhost:8642/health` should return `{"status": "ok"}`
|
||||
- **Check model listing**: `curl http://localhost:8642/v1/models` should return a list with `hermes-agent`
|
||||
- **Docker networking**: From inside Docker, `localhost` means the container, not your host. Use `host.docker.internal` or `--network=host`.
|
||||
|
||||
### Connection test passes but no models load
|
||||
|
||||
This is almost always the missing `/v1` suffix. Open WebUI's connection test is a basic connectivity check — it doesn't verify model listing works.
|
||||
|
||||
### Response takes a long time
|
||||
|
||||
Hermes Agent may be executing multiple tool calls (reading files, running commands, searching the web) before producing its final response. This is normal for complex queries. The response appears all at once when the agent finishes.
|
||||
|
||||
### "Invalid API key" errors
|
||||
|
||||
Make sure your `OPENAI_API_KEY` in Open WebUI matches the `API_SERVER_KEY` in Hermes Agent. If no key is configured on the Hermes side, any non-empty value works.
|
||||
|
||||
## Linux Docker (no Docker Desktop)
|
||||
|
||||
On Linux without Docker Desktop, `host.docker.internal` doesn't resolve by default. Options:
|
||||
|
||||
```bash
|
||||
# Option 1: Add host mapping
|
||||
docker run --add-host=host.docker.internal:host-gateway ...
|
||||
|
||||
# Option 2: Use host networking
|
||||
docker run --network=host -e OPENAI_API_BASE_URL=http://localhost:8642/v1 ...
|
||||
|
||||
# Option 3: Use Docker bridge IP
|
||||
docker run -e OPENAI_API_BASE_URL=http://172.17.0.1:8642/v1 ...
|
||||
```
|
||||
@@ -161,6 +161,26 @@ Hermes Agent works in Telegram group chats with a few considerations:
|
||||
- When privacy mode is off (or bot is admin), the bot sees all messages and can participate naturally
|
||||
- `TELEGRAM_ALLOWED_USERS` still applies — only authorized users can trigger the bot, even in groups
|
||||
|
||||
## Reply Threading Mode
|
||||
|
||||
When a response is longer than 4096 characters, Telegram splits it into multiple message chunks. By default, only the first chunk is sent as a reply to your message (showing the "replied to" bubble). You can change this behavior:
|
||||
|
||||
| Mode | Behavior |
|
||||
|------|----------|
|
||||
| `first` (default) | First chunk replies to your message, rest are standalone |
|
||||
| `all` | Every chunk replies to your message |
|
||||
| `off` | No chunks reply — all sent as standalone messages |
|
||||
|
||||
### Configure via environment variable
|
||||
|
||||
```bash
|
||||
TELEGRAM_REPLY_TO_MODE=off # or "first" or "all"
|
||||
```
|
||||
|
||||
### Configure via gateway config
|
||||
|
||||
In your gateway configuration, set `reply_to_mode` on the Telegram platform config.
|
||||
|
||||
## Recent Bot API Features (2024–2025)
|
||||
|
||||
- **Privacy policy:** Telegram now requires bots to have a privacy policy. Set one via BotFather with `/setprivacy_policy`, or Telegram may auto-generate a placeholder. This is particularly important if your bot is public-facing.
|
||||
|
||||
Reference in New Issue
Block a user