diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index b7a6a09693a..1b2e2e156b9 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -7,7 +7,9 @@ Exposes an HTTP server with endpoints: - GET /v1/responses/{response_id} — Retrieve a stored response - DELETE /v1/responses/{response_id} — Delete a stored response - GET /v1/models — lists hermes-agent as an available model +- GET /v1/capabilities — machine-readable API capabilities for external UIs - POST /v1/runs — start a run, returns run_id immediately (202) +- GET /v1/runs/{run_id} — retrieve current run status - GET /v1/runs/{run_id}/events — SSE stream of structured lifecycle events - POST /v1/runs/{run_id}/stop — interrupt a running agent - GET /health — health check @@ -590,6 +592,8 @@ class APIServerAdapter(BasePlatformAdapter): # Active run agent/task references for stop support self._active_run_agents: Dict[str, Any] = {} self._active_run_tasks: Dict[str, "asyncio.Task"] = {} + # Pollable run status for dashboards and external control-plane UIs. + self._run_statuses: Dict[str, Dict[str, Any]] = {} self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity @staticmethod @@ -808,6 +812,51 @@ class APIServerAdapter(BasePlatformAdapter): ], }) + async def _handle_capabilities(self, request: "web.Request") -> "web.Response": + """GET /v1/capabilities — advertise the stable API surface. + + External UIs and orchestrators use this endpoint to discover the API + server's plugin-safe contract without scraping docs or assuming that + every Hermes version exposes the same endpoints. + """ + auth_err = self._check_auth(request) + if auth_err: + return auth_err + + return web.json_response({ + "object": "hermes.api_server.capabilities", + "platform": "hermes-agent", + "model": self._model_name, + "auth": { + "type": "bearer", + "required": bool(self._api_key), + }, + "features": { + "chat_completions": True, + "chat_completions_streaming": True, + "responses_api": True, + "responses_streaming": True, + "run_submission": True, + "run_status": True, + "run_events_sse": True, + "run_stop": True, + "tool_progress_events": True, + "session_continuity_header": "X-Hermes-Session-Id", + "cors": bool(self._cors_origins), + }, + "endpoints": { + "health": {"method": "GET", "path": "/health"}, + "health_detailed": {"method": "GET", "path": "/health/detailed"}, + "models": {"method": "GET", "path": "/v1/models"}, + "chat_completions": {"method": "POST", "path": "/v1/chat/completions"}, + "responses": {"method": "POST", "path": "/v1/responses"}, + "runs": {"method": "POST", "path": "/v1/runs"}, + "run_status": {"method": "GET", "path": "/v1/runs/{run_id}"}, + "run_events": {"method": "GET", "path": "/v1/runs/{run_id}/events"}, + "run_stop": {"method": "POST", "path": "/v1/runs/{run_id}/stop"}, + }, + }) + async def _handle_chat_completions(self, request: "web.Request") -> "web.Response": """POST /v1/chat/completions — OpenAI Chat Completions format.""" auth_err = self._check_auth(request) @@ -2297,10 +2346,31 @@ class APIServerAdapter(BasePlatformAdapter): _MAX_CONCURRENT_RUNS = 10 # Prevent unbounded resource allocation _RUN_STREAM_TTL = 300 # seconds before orphaned runs are swept + _RUN_STATUS_TTL = 3600 # seconds to retain terminal run status for polling + + def _set_run_status(self, run_id: str, status: str, **fields: Any) -> Dict[str, Any]: + """Update pollable run status without exposing private agent objects.""" + now = time.time() + current = self._run_statuses.get(run_id, {}) + current.update({ + "object": "hermes.run", + "run_id": run_id, + "status": status, + "updated_at": now, + }) + current.setdefault("created_at", fields.pop("created_at", now)) + current.update(fields) + self._run_statuses[run_id] = current + return current def _make_run_event_callback(self, run_id: str, loop: "asyncio.AbstractEventLoop"): """Return a tool_progress_callback that pushes structured events to the run's SSE queue.""" def _push(event: Dict[str, Any]) -> None: + self._set_run_status( + run_id, + self._run_statuses.get(run_id, {}).get("status", "running"), + last_event=event.get("event"), + ) q = self._run_streams.get(run_id) if q is None: return @@ -2365,28 +2435,6 @@ class APIServerAdapter(BasePlatformAdapter): if not user_message: return web.json_response(_openai_error("No user message found in input"), status=400) - run_id = f"run_{uuid.uuid4().hex}" - loop = asyncio.get_running_loop() - q: "asyncio.Queue[Optional[Dict]]" = asyncio.Queue() - self._run_streams[run_id] = q - self._run_streams_created[run_id] = time.time() - - event_cb = self._make_run_event_callback(run_id, loop) - - # Also wire stream_delta_callback so message.delta events flow through - def _text_cb(delta: Optional[str]) -> None: - if delta is None: - return - try: - loop.call_soon_threadsafe(q.put_nowait, { - "event": "message.delta", - "run_id": run_id, - "timestamp": time.time(), - "delta": delta, - }) - except Exception: - pass - instructions = body.get("instructions") previous_response_id = body.get("previous_response_id") @@ -2434,11 +2482,42 @@ class APIServerAdapter(BasePlatformAdapter): ) conversation_history.append({"role": msg["role"], "content": str(content)}) + run_id = f"run_{uuid.uuid4().hex}" session_id = body.get("session_id") or stored_session_id or run_id ephemeral_system_prompt = instructions + loop = asyncio.get_running_loop() + q: "asyncio.Queue[Optional[Dict]]" = asyncio.Queue() + created_at = time.time() + self._run_streams[run_id] = q + self._run_streams_created[run_id] = created_at + + event_cb = self._make_run_event_callback(run_id, loop) + + # Also wire stream_delta_callback so message.delta events flow through. + def _text_cb(delta: Optional[str]) -> None: + if delta is None: + return + try: + loop.call_soon_threadsafe(q.put_nowait, { + "event": "message.delta", + "run_id": run_id, + "timestamp": time.time(), + "delta": delta, + }) + except Exception: + pass + + self._set_run_status( + run_id, + "queued", + created_at=created_at, + session_id=session_id, + model=body.get("model", self._model_name), + ) async def _run_and_close(): try: + self._set_run_status(run_id, "running") agent = self._create_agent( ephemeral_system_prompt=ephemeral_system_prompt, session_id=session_id, @@ -2468,8 +2547,36 @@ class APIServerAdapter(BasePlatformAdapter): "output": final_response, "usage": usage, }) + self._set_run_status( + run_id, + "completed", + output=final_response, + usage=usage, + last_event="run.completed", + ) + except asyncio.CancelledError: + self._set_run_status( + run_id, + "cancelled", + last_event="run.cancelled", + ) + try: + q.put_nowait({ + "event": "run.cancelled", + "run_id": run_id, + "timestamp": time.time(), + }) + except Exception: + pass + raise except Exception as exc: logger.exception("[api_server] run %s failed", run_id) + self._set_run_status( + run_id, + "failed", + error=str(exc), + last_event="run.failed", + ) try: q.put_nowait({ "event": "run.failed", @@ -2499,6 +2606,21 @@ class APIServerAdapter(BasePlatformAdapter): return web.json_response({"run_id": run_id, "status": "started"}, status=202) + async def _handle_get_run(self, request: "web.Request") -> "web.Response": + """GET /v1/runs/{run_id} — return pollable run status for external UIs.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + + run_id = request.match_info["run_id"] + status = self._run_statuses.get(run_id) + if status is None: + return web.json_response( + _openai_error(f"Run not found: {run_id}", code="run_not_found"), + status=404, + ) + return web.json_response(status) + async def _handle_run_events(self, request: "web.Request") -> "web.StreamResponse": """GET /v1/runs/{run_id}/events — SSE stream of structured agent lifecycle events.""" auth_err = self._check_auth(request) @@ -2561,6 +2683,8 @@ class APIServerAdapter(BasePlatformAdapter): if agent is None and task is None: return web.json_response(_openai_error(f"Run not found: {run_id}", code="run_not_found"), status=404) + self._set_run_status(run_id, "stopping", last_event="run.stopping") + if agent is not None: try: agent.interrupt("Stop requested via API") @@ -2603,6 +2727,15 @@ class APIServerAdapter(BasePlatformAdapter): self._active_run_agents.pop(run_id, None) self._active_run_tasks.pop(run_id, None) + stale_statuses = [ + run_id + for run_id, status in list(self._run_statuses.items()) + if status.get("status") in {"completed", "failed", "cancelled"} + and now - float(status.get("updated_at", 0) or 0) > self._RUN_STATUS_TTL + ] + for run_id in stale_statuses: + self._run_statuses.pop(run_id, None) + # ------------------------------------------------------------------ # BasePlatformAdapter interface # ------------------------------------------------------------------ @@ -2621,6 +2754,7 @@ class APIServerAdapter(BasePlatformAdapter): self._app.router.add_get("/health/detailed", self._handle_health_detailed) self._app.router.add_get("/v1/health", self._handle_health) self._app.router.add_get("/v1/models", self._handle_models) + self._app.router.add_get("/v1/capabilities", self._handle_capabilities) self._app.router.add_post("/v1/chat/completions", self._handle_chat_completions) self._app.router.add_post("/v1/responses", self._handle_responses) self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response) @@ -2636,6 +2770,7 @@ class APIServerAdapter(BasePlatformAdapter): self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job) # Structured event streaming self._app.router.add_post("/v1/runs", self._handle_runs) + self._app.router.add_get("/v1/runs/{run_id}", self._handle_get_run) self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events) self._app.router.add_post("/v1/runs/{run_id}/stop", self._handle_stop_run) # Start background sweep to clean up orphaned (unconsumed) run streams diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 8285851064b..75386097c87 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -314,6 +314,7 @@ def _create_app(adapter: APIServerAdapter) -> web.Application: app.router.add_get("/health/detailed", adapter._handle_health_detailed) app.router.add_get("/v1/health", adapter._handle_health) app.router.add_get("/v1/models", adapter._handle_models) + app.router.add_get("/v1/capabilities", adapter._handle_capabilities) app.router.add_post("/v1/chat/completions", adapter._handle_chat_completions) app.router.add_post("/v1/responses", adapter._handle_responses) app.router.add_get("/v1/responses/{response_id}", adapter._handle_get_response) @@ -491,6 +492,46 @@ class TestModelsEndpoint: assert resp.status == 200 +# --------------------------------------------------------------------------- +# /v1/capabilities endpoint +# --------------------------------------------------------------------------- + + +class TestCapabilitiesEndpoint: + @pytest.mark.asyncio + async def test_capabilities_advertises_plugin_safe_contract(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/v1/capabilities") + assert resp.status == 200 + data = await resp.json() + assert data["object"] == "hermes.api_server.capabilities" + assert data["platform"] == "hermes-agent" + assert data["model"] == "hermes-agent" + assert data["auth"]["type"] == "bearer" + assert data["auth"]["required"] is False + assert data["features"]["chat_completions"] is True + assert data["features"]["run_status"] is True + assert data["features"]["run_events_sse"] is True + assert data["features"]["session_continuity_header"] == "X-Hermes-Session-Id" + assert data["endpoints"]["run_status"]["path"] == "/v1/runs/{run_id}" + + @pytest.mark.asyncio + async def test_capabilities_requires_auth_when_key_configured(self, auth_adapter): + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/v1/capabilities") + assert resp.status == 401 + + authed = await cli.get( + "/v1/capabilities", + headers={"Authorization": "Bearer sk-secret"}, + ) + assert authed.status == 200 + data = await authed.json() + assert data["auth"]["required"] is True + + # --------------------------------------------------------------------------- # /v1/chat/completions endpoint # --------------------------------------------------------------------------- diff --git a/tests/gateway/test_api_server_runs.py b/tests/gateway/test_api_server_runs.py index e485bad5cef..900eb3c8692 100644 --- a/tests/gateway/test_api_server_runs.py +++ b/tests/gateway/test_api_server_runs.py @@ -1,7 +1,8 @@ -"""Tests for /v1/runs endpoints: start, events, and stop. +"""Tests for /v1/runs endpoints: start, status, events, and stop. Covers: - POST /v1/runs — start a run (202) +- GET /v1/runs/{run_id} — poll run status - GET /v1/runs/{run_id}/events — SSE event stream - POST /v1/runs/{run_id}/stop — interrupt a running agent - Auth, error handling, and cleanup @@ -46,6 +47,7 @@ def _create_runs_app(adapter: APIServerAdapter) -> web.Application: app = web.Application(middlewares=mws) app["api_server_adapter"] = adapter app.router.add_post("/v1/runs", adapter._handle_runs) + app.router.add_get("/v1/runs/{run_id}", adapter._handle_get_run) app.router.add_get("/v1/runs/{run_id}/events", adapter._handle_run_events) app.router.add_post("/v1/runs/{run_id}/stop", adapter._handle_stop_run) return app @@ -116,6 +118,13 @@ class TestStartRun: assert data["status"] == "started" assert data["run_id"].startswith("run_") + status_resp = await cli.get(f"/v1/runs/{data['run_id']}") + assert status_resp.status == 200 + status = await status_resp.json() + assert status["run_id"] == data["run_id"] + assert status["status"] in {"queued", "running", "completed"} + assert status["object"] == "hermes.run" + @pytest.mark.asyncio async def test_start_invalid_json_returns_400(self, adapter): app = _create_runs_app(adapter) @@ -143,6 +152,18 @@ class TestStartRun: resp = await cli.post("/v1/runs", json={"input": ""}) assert resp.status == 400 + @pytest.mark.asyncio + async def test_start_invalid_history_does_not_allocate_run(self, adapter): + app = _create_runs_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post( + "/v1/runs", + json={"input": "hello", "conversation_history": {"role": "user"}}, + ) + assert resp.status == 400 + assert adapter._run_streams == {} + assert adapter._run_statuses == {} + @pytest.mark.asyncio async def test_start_requires_auth(self, auth_adapter): app = _create_runs_app(auth_adapter) @@ -170,6 +191,89 @@ class TestStartRun: assert resp.status == 202 +# --------------------------------------------------------------------------- +# GET /v1/runs/{run_id} — poll run status +# --------------------------------------------------------------------------- + + +class TestRunStatus: + @pytest.mark.asyncio + async def test_status_completed_run_includes_output_and_usage(self, adapter): + app = _create_runs_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_create_agent") as mock_create: + mock_agent = MagicMock() + mock_agent.run_conversation.return_value = {"final_response": "done"} + mock_agent.session_prompt_tokens = 4 + mock_agent.session_completion_tokens = 2 + mock_agent.session_total_tokens = 6 + mock_create.return_value = mock_agent + + resp = await cli.post("/v1/runs", json={"input": "hello"}) + data = await resp.json() + run_id = data["run_id"] + + for _ in range(20): + status_resp = await cli.get(f"/v1/runs/{run_id}") + assert status_resp.status == 200 + status = await status_resp.json() + if status["status"] == "completed": + break + await asyncio.sleep(0.05) + + assert status["status"] == "completed" + assert status["output"] == "done" + assert status["usage"]["total_tokens"] == 6 + assert status["last_event"] == "run.completed" + + @pytest.mark.asyncio + async def test_status_reflects_explicit_session_id(self, adapter): + app = _create_runs_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_create_agent") as mock_create: + mock_agent = MagicMock() + mock_agent.run_conversation.return_value = {"final_response": "done"} + mock_agent.session_prompt_tokens = 0 + mock_agent.session_completion_tokens = 0 + mock_agent.session_total_tokens = 0 + mock_create.return_value = mock_agent + + resp = await cli.post( + "/v1/runs", + json={"input": "hello", "session_id": "space-session"}, + ) + data = await resp.json() + run_id = data["run_id"] + + for _ in range(20): + status_resp = await cli.get(f"/v1/runs/{run_id}") + status = await status_resp.json() + if status["status"] == "completed": + break + await asyncio.sleep(0.05) + + mock_agent.run_conversation.assert_called_once() + # task_id stays "default" so the Runs API shares one sandbox + # container with CLI/gateway; session_id is surfaced in status + # for external UIs to correlate runs with their own session IDs. + assert mock_agent.run_conversation.call_args.kwargs["task_id"] == "default" + assert status["session_id"] == "space-session" + + @pytest.mark.asyncio + async def test_status_not_found_returns_404(self, adapter): + app = _create_runs_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/v1/runs/run_nonexistent") + assert resp.status == 404 + + @pytest.mark.asyncio + async def test_status_requires_auth(self, auth_adapter): + app = _create_runs_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.get("/v1/runs/run_any") + assert resp.status == 401 + + # --------------------------------------------------------------------------- # GET /v1/runs/{run_id}/events — SSE event stream # --------------------------------------------------------------------------- @@ -257,6 +361,11 @@ class TestStopRun: # Agent interrupt should have been called mock_agent.interrupt.assert_called_once_with("Stop requested via API") + status_resp = await cli.get(f"/v1/runs/{run_id}") + assert status_resp.status == 200 + status_data = await status_resp.json() + assert status_data["status"] in {"stopping", "cancelled"} + # Refs should be cleaned up await asyncio.sleep(0.5) assert run_id not in adapter._active_run_agents diff --git a/website/docs/user-guide/features/api-server.md b/website/docs/user-guide/features/api-server.md index baae1d2d575..16b6eed8c7c 100644 --- a/website/docs/user-guide/features/api-server.md +++ b/website/docs/user-guide/features/api-server.md @@ -194,6 +194,29 @@ Delete a stored response. Lists the agent as an available model. The advertised model name defaults to the [profile](/docs/user-guide/profiles) name (or `hermes-agent` for the default profile). Required by most frontends for model discovery. +### GET /v1/capabilities + +Returns a machine-readable description of the API server's stable surface for external UIs, orchestrators, and plugin bridges. + +```json +{ + "object": "hermes.api_server.capabilities", + "platform": "hermes-agent", + "model": "hermes-agent", + "auth": {"type": "bearer", "required": true}, + "features": { + "chat_completions": true, + "responses_api": true, + "run_submission": true, + "run_status": true, + "run_events_sse": true, + "run_stop": true + } +} +``` + +Use this endpoint when integrating dashboards, browser UIs, or control planes so they can discover whether the running Hermes version supports runs, streaming, cancellation, and session continuity without depending on private Python internals. + ### GET /health Health check. Returns `{"status": "ok"}`. Also available at **GET /v1/health** for OpenAI-compatible clients that expect the `/v1/` prefix. @@ -210,10 +233,41 @@ In addition to `/v1/chat/completions` and `/v1/responses`, the server exposes a Create a new agent run. Returns a `run_id` that can be used to subscribe to progress events. +```json +{ + "run_id": "run_abc123", + "status": "started" +} +``` + +Runs accept a simple `input` string and optional `session_id`, `instructions`, `conversation_history`, or `previous_response_id`. When `session_id` is provided, Hermes surfaces it in the run status so external UIs can correlate runs with their own conversation IDs. + +### GET /v1/runs/\{run_id\} + +Poll the current run state. This is useful for dashboards that need status without holding an SSE connection open, or for UIs that reconnect after navigation. + +```json +{ + "object": "hermes.run", + "run_id": "run_abc123", + "status": "completed", + "session_id": "space-session", + "model": "hermes-agent", + "output": "Done.", + "usage": {"input_tokens": 50, "output_tokens": 200, "total_tokens": 250} +} +``` + +Statuses are retained briefly after terminal states (`completed`, `failed`, or `cancelled`) for polling and UI reconciliation. + ### GET /v1/runs/\{run_id\}/events Server-Sent Events stream of the run's tool-call progress, token deltas, and lifecycle events. Designed for dashboards and thick clients that want to attach/detach without losing state. +### POST /v1/runs/\{run_id\}/stop + +Interrupt a running agent turn. The endpoint returns immediately with `{"status": "stopping"}` while Hermes asks the active agent to stop at the next safe interruption point. + ## Jobs API (background scheduled work) The server exposes a lightweight jobs CRUD surface for managing scheduled / background agent runs from a remote client. All endpoints are gated behind the same bearer auth.