diff --git a/gateway/run.py b/gateway/run.py index 568ffd61ba..222e28c3eb 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7408,6 +7408,263 @@ class GatewayRunner: with _lock: self._agent_cache.pop(session_key, None) + # ------------------------------------------------------------------ + # Proxy mode: forward messages to a remote Hermes API server + # ------------------------------------------------------------------ + + def _get_proxy_url(self) -> Optional[str]: + """Return the proxy URL if proxy mode is configured, else None. + + Checks GATEWAY_PROXY_URL env var first (convenient for Docker), + then ``gateway.proxy_url`` in config.yaml. + """ + url = os.getenv("GATEWAY_PROXY_URL", "").strip() + if url: + return url.rstrip("/") + cfg = _load_gateway_config() + url = (cfg.get("gateway") or {}).get("proxy_url", "").strip() + if url: + return url.rstrip("/") + return None + + async def _run_agent_via_proxy( + self, + message: str, + context_prompt: str, + history: List[Dict[str, Any]], + source: "SessionSource", + session_id: str, + session_key: str = None, + event_message_id: Optional[str] = None, + ) -> Dict[str, Any]: + """Forward the message to a remote Hermes API server instead of + running a local AIAgent. + + When ``GATEWAY_PROXY_URL`` (or ``gateway.proxy_url`` in config.yaml) + is set, the gateway becomes a thin relay: it handles platform I/O + (encryption, threading, media) and delegates all agent work to the + remote server via ``POST /v1/chat/completions`` with SSE streaming. + + This lets a Docker container handle Matrix E2EE while the actual + agent runs on the host with full access to local files, memory, + skills, and a unified session store. + """ + try: + from aiohttp import ClientSession as _AioClientSession, ClientTimeout + except ImportError: + return { + "final_response": "⚠️ Proxy mode requires aiohttp. Install with: pip install aiohttp", + "messages": [], + "api_calls": 0, + "tools": [], + } + + proxy_url = self._get_proxy_url() + if not proxy_url: + return { + "final_response": "⚠️ Proxy URL not configured (GATEWAY_PROXY_URL or gateway.proxy_url)", + "messages": [], + "api_calls": 0, + "tools": [], + } + + proxy_key = os.getenv("GATEWAY_PROXY_KEY", "").strip() + + # Build messages in OpenAI chat format -------------------------- + # + # The remote api_server can maintain session continuity via + # X-Hermes-Session-Id, so it loads its own history. We only + # need to send the current user message. If the remote has + # no history for this session yet, include what we have locally + # so the first exchange has context. + # + # We always include the current message. For history, send a + # compact version (text-only user/assistant turns) — the remote + # handles tool replay and system prompts. + api_messages: List[Dict[str, str]] = [] + + if context_prompt: + api_messages.append({"role": "system", "content": context_prompt}) + + for msg in history: + role = msg.get("role") + content = msg.get("content") + if role in ("user", "assistant") and content: + api_messages.append({"role": role, "content": content}) + + api_messages.append({"role": "user", "content": message}) + + # HTTP headers --------------------------------------------------- + headers: Dict[str, str] = {"Content-Type": "application/json"} + if proxy_key: + headers["Authorization"] = f"Bearer {proxy_key}" + if session_id: + headers["X-Hermes-Session-Id"] = session_id + + body = { + "model": "hermes-agent", + "messages": api_messages, + "stream": True, + } + + # Set up platform streaming if available ------------------------- + _stream_consumer = None + _scfg = getattr(getattr(self, "config", None), "streaming", None) + if _scfg is None: + from gateway.config import StreamingConfig + _scfg = StreamingConfig() + + platform_key = _platform_config_key(source.platform) + user_config = _load_gateway_config() + from gateway.display_config import resolve_display_setting + _plat_streaming = resolve_display_setting( + user_config, platform_key, "streaming" + ) + _streaming_enabled = ( + _scfg.enabled and _scfg.transport != "off" + if _plat_streaming is None + else bool(_plat_streaming) + ) + + if source.thread_id: + _thread_metadata: Optional[Dict[str, Any]] = {"thread_id": source.thread_id} + else: + _thread_metadata = None + + if _streaming_enabled: + try: + from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig + from gateway.config import Platform + _adapter = self.adapters.get(source.platform) + if _adapter: + _adapter_supports_edit = getattr(_adapter, "SUPPORTS_MESSAGE_EDITING", True) + _effective_cursor = _scfg.cursor if _adapter_supports_edit else "" + if source.platform == Platform.MATRIX: + _effective_cursor = "" + _consumer_cfg = StreamConsumerConfig( + edit_interval=_scfg.edit_interval, + buffer_threshold=_scfg.buffer_threshold, + cursor=_effective_cursor, + ) + _stream_consumer = GatewayStreamConsumer( + adapter=_adapter, + chat_id=source.chat_id, + config=_consumer_cfg, + metadata=_thread_metadata, + ) + except Exception as _sc_err: + logger.debug("Proxy: could not set up stream consumer: %s", _sc_err) + + # Run the stream consumer task in the background + stream_task = None + if _stream_consumer: + stream_task = asyncio.create_task(_stream_consumer.run()) + + # Send typing indicator + _adapter = self.adapters.get(source.platform) + if _adapter: + try: + await _adapter.send_typing(source.chat_id, metadata=_thread_metadata) + except Exception: + pass + + # Make the HTTP request with SSE streaming ----------------------- + full_response = "" + _start = time.time() + + try: + _timeout = ClientTimeout(total=0, sock_read=1800) + async with _AioClientSession(timeout=_timeout) as session: + async with session.post( + f"{proxy_url}/v1/chat/completions", + json=body, + headers=headers, + ) as resp: + if resp.status != 200: + error_text = await resp.text() + logger.warning( + "Proxy error (%d) from %s: %s", + resp.status, proxy_url, error_text[:500], + ) + return { + "final_response": f"⚠️ Proxy error ({resp.status}): {error_text[:300]}", + "messages": [], + "api_calls": 0, + "tools": [], + } + + # Parse SSE stream + buffer = "" + async for chunk in resp.content.iter_any(): + text = chunk.decode("utf-8", errors="replace") + buffer += text + + # Process complete SSE lines + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if not line: + continue + if line.startswith("data: "): + data = line[6:] + if data.strip() == "[DONE]": + break + try: + obj = json.loads(data) + choices = obj.get("choices", []) + if choices: + delta = choices[0].get("delta", {}) + content = delta.get("content", "") + if content: + full_response += content + if _stream_consumer: + _stream_consumer.on_delta(content) + except json.JSONDecodeError: + pass + + except asyncio.CancelledError: + raise + except Exception as e: + logger.error("Proxy connection error to %s: %s", proxy_url, e) + if not full_response: + return { + "final_response": f"⚠️ Proxy connection error: {e}", + "messages": [], + "api_calls": 0, + "tools": [], + } + # Partial response — return what we got + finally: + # Finalize stream consumer + if _stream_consumer: + _stream_consumer.finish() + if stream_task: + try: + await asyncio.wait_for(stream_task, timeout=5.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + stream_task.cancel() + + _elapsed = time.time() - _start + logger.info( + "proxy response: url=%s session=%s time=%.1fs response=%d chars", + proxy_url, (session_id or "")[:20], _elapsed, len(full_response), + ) + + return { + "final_response": full_response or "(No response from remote agent)", + "messages": [ + {"role": "user", "content": message}, + {"role": "assistant", "content": full_response}, + ], + "api_calls": 1, + "tools": [], + "history_offset": len(history), + "session_id": session_id, + "response_previewed": _stream_consumer is not None and bool(full_response), + } + + # ------------------------------------------------------------------ + async def _run_agent( self, message: str, @@ -7431,6 +7688,18 @@ class GatewayRunner: This is run in a thread pool to not block the event loop. Supports interruption via new messages. """ + # ---- Proxy mode: delegate to remote API server ---- + if self._get_proxy_url(): + return await self._run_agent_via_proxy( + message=message, + context_prompt=context_prompt, + history=history, + source=source, + session_id=session_id, + session_key=session_key, + event_message_id=event_message_id, + ) + from run_agent import AIAgent import queue diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 78cc30157d..d121bc517f 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -1429,6 +1429,22 @@ OPTIONAL_ENV_VARS = { "category": "messaging", "advanced": True, }, + "GATEWAY_PROXY_URL": { + "description": "URL of a remote Hermes API server to forward messages to (proxy mode). When set, the gateway handles platform I/O only — all agent work is delegated to the remote server. Use for Docker E2EE containers that relay to a host agent. Also configurable via gateway.proxy_url in config.yaml.", + "prompt": "Remote Hermes API server URL (e.g. http://192.168.1.100:8642)", + "url": None, + "password": False, + "category": "messaging", + "advanced": True, + }, + "GATEWAY_PROXY_KEY": { + "description": "Bearer token for authenticating with the remote Hermes API server (proxy mode). Must match the API_SERVER_KEY on the remote host.", + "prompt": "Remote API server auth key", + "url": None, + "password": True, + "category": "messaging", + "advanced": True, + }, "WEBHOOK_ENABLED": { "description": "Enable the webhook platform adapter for receiving events from GitHub, GitLab, etc.", "prompt": "Enable webhooks (true/false)", diff --git a/tests/gateway/test_proxy_mode.py b/tests/gateway/test_proxy_mode.py new file mode 100644 index 0000000000..f3024cb09f --- /dev/null +++ b/tests/gateway/test_proxy_mode.py @@ -0,0 +1,445 @@ +"""Tests for gateway proxy mode — forwarding messages to a remote API server.""" + +import asyncio +import json +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import Platform, StreamingConfig +from gateway.run import GatewayRunner +from gateway.session import SessionSource + + +def _make_runner(proxy_url=None): + """Create a minimal GatewayRunner for proxy tests.""" + runner = object.__new__(GatewayRunner) + runner.adapters = {} + runner.config = MagicMock() + runner.config.streaming = StreamingConfig() + runner._running_agents = {} + runner._session_model_overrides = {} + runner._agent_cache = {} + runner._agent_cache_lock = None + return runner + + +def _make_source(platform=Platform.MATRIX): + return SessionSource( + platform=platform, + chat_id="!room:server.org", + chat_name="Test Room", + chat_type="group", + user_id="@user:server.org", + user_name="testuser", + thread_id=None, + ) + + +class _FakeSSEResponse: + """Simulates an aiohttp response with SSE streaming.""" + + def __init__(self, status=200, sse_chunks=None, error_text=""): + self.status = status + self._sse_chunks = sse_chunks or [] + self._error_text = error_text + self.content = self + + async def text(self): + return self._error_text + + async def iter_any(self): + for chunk in self._sse_chunks: + if isinstance(chunk, str): + chunk = chunk.encode("utf-8") + yield chunk + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + pass + + +class _FakeSession: + """Simulates an aiohttp.ClientSession with captured request args.""" + + def __init__(self, response): + self._response = response + self.captured_url = None + self.captured_json = None + self.captured_headers = None + + def post(self, url, json=None, headers=None, **kwargs): + self.captured_url = url + self.captured_json = json + self.captured_headers = headers + return self._response + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + pass + + +def _patch_aiohttp(session): + """Patch aiohttp.ClientSession to return our fake session.""" + return patch( + "aiohttp.ClientSession", + return_value=session, + ) + + +class TestGetProxyUrl: + """Test _get_proxy_url() config resolution.""" + + def test_returns_none_when_not_configured(self, monkeypatch): + monkeypatch.delenv("GATEWAY_PROXY_URL", raising=False) + runner = _make_runner() + with patch("gateway.run._load_gateway_config", return_value={}): + assert runner._get_proxy_url() is None + + def test_reads_from_env_var(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://192.168.1.100:8642") + runner = _make_runner() + assert runner._get_proxy_url() == "http://192.168.1.100:8642" + + def test_strips_trailing_slash(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642/") + runner = _make_runner() + assert runner._get_proxy_url() == "http://host:8642" + + def test_reads_from_config_yaml(self, monkeypatch): + monkeypatch.delenv("GATEWAY_PROXY_URL", raising=False) + runner = _make_runner() + cfg = {"gateway": {"proxy_url": "http://10.0.0.1:8642"}} + with patch("gateway.run._load_gateway_config", return_value=cfg): + assert runner._get_proxy_url() == "http://10.0.0.1:8642" + + def test_env_var_overrides_config(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://env-host:8642") + runner = _make_runner() + cfg = {"gateway": {"proxy_url": "http://config-host:8642"}} + with patch("gateway.run._load_gateway_config", return_value=cfg): + assert runner._get_proxy_url() == "http://env-host:8642" + + def test_empty_string_treated_as_unset(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", " ") + runner = _make_runner() + with patch("gateway.run._load_gateway_config", return_value={}): + assert runner._get_proxy_url() is None + + +class TestRunAgentProxyDispatch: + """Test that _run_agent() delegates to proxy when configured.""" + + @pytest.mark.asyncio + async def test_run_agent_delegates_to_proxy(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642") + runner = _make_runner() + source = _make_source() + + expected_result = { + "final_response": "Hello from remote!", + "messages": [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "Hello from remote!"}, + ], + "api_calls": 1, + "tools": [], + } + + runner._run_agent_via_proxy = AsyncMock(return_value=expected_result) + + result = await runner._run_agent( + message="hi", + context_prompt="", + history=[], + source=source, + session_id="test-session-123", + session_key="test-key", + ) + + assert result["final_response"] == "Hello from remote!" + runner._run_agent_via_proxy.assert_called_once() + + @pytest.mark.asyncio + async def test_run_agent_skips_proxy_when_not_configured(self, monkeypatch): + monkeypatch.delenv("GATEWAY_PROXY_URL", raising=False) + runner = _make_runner() + + runner._run_agent_via_proxy = AsyncMock() + + with patch("gateway.run._load_gateway_config", return_value={}): + try: + await runner._run_agent( + message="hi", + context_prompt="", + history=[], + source=_make_source(), + session_id="test-session", + ) + except Exception: + pass # Expected — bare runner can't create a real agent + + runner._run_agent_via_proxy.assert_not_called() + + +class TestRunAgentViaProxy: + """Test the actual proxy HTTP forwarding logic.""" + + @pytest.mark.asyncio + async def test_builds_correct_request(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642") + monkeypatch.setenv("GATEWAY_PROXY_KEY", "test-key-123") + runner = _make_runner() + source = _make_source() + + resp = _FakeSSEResponse( + status=200, + sse_chunks=[ + 'data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n' + 'data: {"choices":[{"delta":{"content":" world"}}]}\n\n' + "data: [DONE]\n\n" + ], + ) + session = _FakeSession(resp) + + with patch("gateway.run._load_gateway_config", return_value={}): + with _patch_aiohttp(session): + with patch("aiohttp.ClientTimeout"): + result = await runner._run_agent_via_proxy( + message="How are you?", + context_prompt="You are helpful.", + history=[ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ], + source=source, + session_id="session-abc", + ) + + # Verify request URL + assert session.captured_url == "http://host:8642/v1/chat/completions" + + # Verify auth header + assert session.captured_headers["Authorization"] == "Bearer test-key-123" + + # Verify session ID header + assert session.captured_headers["X-Hermes-Session-Id"] == "session-abc" + + # Verify messages include system, history, and current message + messages = session.captured_json["messages"] + assert messages[0] == {"role": "system", "content": "You are helpful."} + assert messages[1] == {"role": "user", "content": "Hello"} + assert messages[2] == {"role": "assistant", "content": "Hi there!"} + assert messages[3] == {"role": "user", "content": "How are you?"} + + # Verify streaming is requested + assert session.captured_json["stream"] is True + + # Verify response was assembled + assert result["final_response"] == "Hello world" + + @pytest.mark.asyncio + async def test_handles_http_error(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642") + monkeypatch.delenv("GATEWAY_PROXY_KEY", raising=False) + runner = _make_runner() + source = _make_source() + + resp = _FakeSSEResponse(status=401, error_text="Unauthorized: invalid API key") + session = _FakeSession(resp) + + with patch("gateway.run._load_gateway_config", return_value={}): + with _patch_aiohttp(session): + with patch("aiohttp.ClientTimeout"): + result = await runner._run_agent_via_proxy( + message="hi", + context_prompt="", + history=[], + source=source, + session_id="test", + ) + + assert "Proxy error (401)" in result["final_response"] + assert result["api_calls"] == 0 + + @pytest.mark.asyncio + async def test_handles_connection_error(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://unreachable:8642") + monkeypatch.delenv("GATEWAY_PROXY_KEY", raising=False) + runner = _make_runner() + source = _make_source() + + class _ErrorSession: + def post(self, *args, **kwargs): + raise ConnectionError("Connection refused") + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + pass + + with patch("gateway.run._load_gateway_config", return_value={}): + with patch("aiohttp.ClientSession", return_value=_ErrorSession()): + with patch("aiohttp.ClientTimeout"): + result = await runner._run_agent_via_proxy( + message="hi", + context_prompt="", + history=[], + source=source, + session_id="test", + ) + + assert "Proxy connection error" in result["final_response"] + + @pytest.mark.asyncio + async def test_skips_tool_messages_in_history(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642") + monkeypatch.delenv("GATEWAY_PROXY_KEY", raising=False) + runner = _make_runner() + source = _make_source() + + resp = _FakeSSEResponse( + status=200, + sse_chunks=[b'data: {"choices":[{"delta":{"content":"ok"}}]}\n\ndata: [DONE]\n\n'], + ) + session = _FakeSession(resp) + + history = [ + {"role": "user", "content": "search for X"}, + {"role": "assistant", "content": None, "tool_calls": [{"id": "tc1"}]}, + {"role": "tool", "content": "search results...", "tool_call_id": "tc1"}, + {"role": "assistant", "content": "Found results."}, + ] + + with patch("gateway.run._load_gateway_config", return_value={}): + with _patch_aiohttp(session): + with patch("aiohttp.ClientTimeout"): + await runner._run_agent_via_proxy( + message="tell me more", + context_prompt="", + history=history, + source=source, + session_id="test", + ) + + # Only user and assistant with content should be forwarded + messages = session.captured_json["messages"] + roles = [m["role"] for m in messages] + assert "tool" not in roles + # assistant with None content should be skipped + assert all(m.get("content") for m in messages) + + @pytest.mark.asyncio + async def test_result_shape_matches_run_agent(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642") + monkeypatch.delenv("GATEWAY_PROXY_KEY", raising=False) + runner = _make_runner() + source = _make_source() + + resp = _FakeSSEResponse( + status=200, + sse_chunks=[b'data: {"choices":[{"delta":{"content":"answer"}}]}\n\ndata: [DONE]\n\n'], + ) + session = _FakeSession(resp) + + with patch("gateway.run._load_gateway_config", return_value={}): + with _patch_aiohttp(session): + with patch("aiohttp.ClientTimeout"): + result = await runner._run_agent_via_proxy( + message="hi", + context_prompt="", + history=[{"role": "user", "content": "prev"}, {"role": "assistant", "content": "ok"}], + source=source, + session_id="sess-123", + ) + + # Required keys that callers depend on + assert "final_response" in result + assert result["final_response"] == "answer" + assert "messages" in result + assert "api_calls" in result + assert "tools" in result + assert "history_offset" in result + assert result["history_offset"] == 2 # len(history) + assert "session_id" in result + assert result["session_id"] == "sess-123" + + @pytest.mark.asyncio + async def test_no_auth_header_without_key(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642") + monkeypatch.delenv("GATEWAY_PROXY_KEY", raising=False) + runner = _make_runner() + source = _make_source() + + resp = _FakeSSEResponse( + status=200, + sse_chunks=[b'data: {"choices":[{"delta":{"content":"ok"}}]}\n\ndata: [DONE]\n\n'], + ) + session = _FakeSession(resp) + + with patch("gateway.run._load_gateway_config", return_value={}): + with _patch_aiohttp(session): + with patch("aiohttp.ClientTimeout"): + await runner._run_agent_via_proxy( + message="hi", + context_prompt="", + history=[], + source=source, + session_id="test", + ) + + assert "Authorization" not in session.captured_headers + + @pytest.mark.asyncio + async def test_no_system_message_when_context_empty(self, monkeypatch): + monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642") + monkeypatch.delenv("GATEWAY_PROXY_KEY", raising=False) + runner = _make_runner() + source = _make_source() + + resp = _FakeSSEResponse( + status=200, + sse_chunks=[b'data: {"choices":[{"delta":{"content":"ok"}}]}\n\ndata: [DONE]\n\n'], + ) + session = _FakeSession(resp) + + with patch("gateway.run._load_gateway_config", return_value={}): + with _patch_aiohttp(session): + with patch("aiohttp.ClientTimeout"): + await runner._run_agent_via_proxy( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="test", + ) + + # No system message should appear when context_prompt is empty + messages = session.captured_json["messages"] + assert len(messages) == 1 + assert messages[0]["role"] == "user" + assert messages[0]["content"] == "hello" + + +class TestEnvVarRegistration: + """Verify GATEWAY_PROXY_URL and GATEWAY_PROXY_KEY are registered.""" + + def test_proxy_url_in_optional_env_vars(self): + from hermes_cli.config import OPTIONAL_ENV_VARS + assert "GATEWAY_PROXY_URL" in OPTIONAL_ENV_VARS + info = OPTIONAL_ENV_VARS["GATEWAY_PROXY_URL"] + assert info["category"] == "messaging" + assert info["password"] is False + + def test_proxy_key_in_optional_env_vars(self): + from hermes_cli.config import OPTIONAL_ENV_VARS + assert "GATEWAY_PROXY_KEY" in OPTIONAL_ENV_VARS + info = OPTIONAL_ENV_VARS["GATEWAY_PROXY_KEY"] + assert info["category"] == "messaging" + assert info["password"] is True