Compare commits

...

1 Commits

Author SHA1 Message Date
teknium1
96d6d72cab feat: webhook adapter with response accumulator fix
Cherry-picked from PR #1124 with an important fix: the original
future-based response capture resolved on the FIRST send() call
(which was often a notification, not the agent response). Replaced
with an accumulator pattern that waits for processing to complete
and returns the LAST send() — the actual agent response.

Changes:
- gateway/platforms/webhook.py: new adapter with accumulator pattern
- gateway/config.py: Platform.WEBHOOK enum + env var handling
- gateway/run.py: factory registration + auth bypass
- gateway/session.py: webhook chat_id session isolation (like WhatsApp)
- docs/integrations/miniverse.md: integration guide
2026-03-13 07:04:16 -07:00
5 changed files with 295 additions and 3 deletions

View File

@@ -0,0 +1,38 @@
# Miniverse Integration
Connect your Hermes agents to a [Miniverse](https://github.com/ianscott313/miniverse) pixel world where they can live, work, and communicate with other AI agents.
## Overview
[hermes-miniverse](https://github.com/teknium1/hermes-miniverse) is a standalone bridge that connects Hermes Agent to Miniverse — no changes to your Hermes installation required.
```
Hermes Agent ←→ hermes-miniverse bridge ←→ Miniverse Server
```
## Features
- **Automatic presence**: Your agent appears in the pixel world with live state (working, thinking, idle)
- **Inter-agent messaging**: Other agents can message your Hermes agent and receive responses
- **Conscious interaction**: Your agent can choose to speak, message others, and join channels
- **Multiple agents**: Run several Hermes instances as different agents in the same world
## Setup
See the [hermes-miniverse README](https://github.com/teknium1/hermes-miniverse) for installation and configuration instructions.
### Components
| Component | Where | Purpose |
|-----------|-------|---------|
| `bridge.py` | Standalone daemon | Heartbeats, webhook receiver, message injection |
| `hooks/miniverse/` | `~/.hermes/hooks/` | Gateway hook for state broadcasting |
| `skill/miniverse-world/` | `~/.hermes/skills/` | Teaches agents miniverse API commands |
## Architecture
The bridge is a standalone HTTP server that sits between Hermes and Miniverse:
1. **State out** (Hermes → Miniverse): Gateway hook fires on `agent:start/step/end` → POSTs to bridge → bridge sends miniverse heartbeats
2. **Messages in** (Miniverse → Hermes): Miniverse webhooks → bridge HTTP server → injects into Hermes via CLI
3. **Agent interaction** (via skill): Agent uses `terminal` tool with `curl` commands to speak, message, observe

View File

@@ -29,6 +29,7 @@ class Platform(Enum):
SIGNAL = "signal"
HOMEASSISTANT = "homeassistant"
EMAIL = "email"
WEBHOOK = "webhook"
@dataclass
@@ -458,6 +459,19 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
name=os.getenv("EMAIL_HOME_ADDRESS_NAME", "Home"),
)
# Webhook (generic inbound HTTP adapter)
webhook_port = os.getenv("WEBHOOK_PORT")
if webhook_port:
if Platform.WEBHOOK not in config.platforms:
config.platforms[Platform.WEBHOOK] = PlatformConfig()
config.platforms[Platform.WEBHOOK].enabled = True
# Default to no auto-reset for webhook sessions — these are
# programmatic integrations (bridges, automations) where the
# caller manages lifecycle. Context is preserved until the
# caller explicitly creates a new session or chat_id.
if Platform.WEBHOOK not in config.reset_by_platform:
config.reset_by_platform[Platform.WEBHOOK] = SessionResetPolicy(mode="none")
# Session settings
idle_minutes = os.getenv("SESSION_IDLE_MINUTES")
if idle_minutes:

View File

@@ -0,0 +1,232 @@
"""Generic webhook inbound platform adapter.
Runs a lightweight HTTP server that accepts POST /message requests and
routes them through the gateway as regular conversations. Each unique
``chat_id`` in the request gets its own session — supporting multiple
concurrent agents/conversations.
The response is returned synchronously in the HTTP response body (the
connection is held open until the agent finishes). This makes it
trivially easy for external bridges, automation tools, or other agent
frameworks to integrate with Hermes.
Enable via env var:
WEBHOOK_PORT=4568 (any port number enables the adapter)
API:
POST /message
{
"chat_id": "hermes-1", // required — maps to a session
"message": "Hello!", // required — the message text
"from": "other-agent", // optional — sender display name
"user_id": "agent-123" // optional — sender ID
}
Response (200):
{
"ok": true,
"response": "Hi there!",
"session_id": "20260312_..."
}
GET /health
{"ok": true, "adapter": "webhook", "port": 4568}
"""
import asyncio
import json
import logging
import os
import time
from aiohttp import web
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
SendResult,
SessionSource,
)
logger = logging.getLogger(__name__)
def check_webhook_requirements() -> bool:
"""Webhook adapter is available when WEBHOOK_PORT is set."""
return bool(os.getenv("WEBHOOK_PORT"))
class WebhookAdapter(BasePlatformAdapter):
"""HTTP webhook adapter — accepts POST requests as inbound messages.
External services (bridges, automation tools, other agents) POST
messages and receive the agent's response in the HTTP response body.
Each chat_id maps to a separate gateway session.
"""
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.WEBHOOK)
self.port = int(os.getenv("WEBHOOK_PORT", "4568"))
self._app: web.Application = None
self._runner: web.AppRunner = None
self._site: web.TCPSite = None
# Accumulated responses keyed by session_key — we always want the LAST
# send() call (the final agent response), not intermediate notifications.
self._response_accumulators: dict[str, list[str]] = {}
self._response_events: dict[str, asyncio.Event] = {}
async def connect(self) -> bool:
"""Start the HTTP server."""
self._app = web.Application()
self._app.router.add_post("/message", self._handle_post)
self._app.router.add_get("/health", self._handle_health)
self._runner = web.AppRunner(self._app, access_log=None)
await self._runner.setup()
try:
self._site = web.TCPSite(self._runner, "0.0.0.0", self.port)
await self._site.start()
except OSError as e:
logger.error("Webhook adapter failed to bind port %d: %s", self.port, e)
return False
print(f"[webhook] Listening on port {self.port}")
print(f"[webhook] POST http://localhost:{self.port}/message")
return True
async def disconnect(self) -> None:
if self._site:
await self._site.stop()
if self._runner:
await self._runner.cleanup()
async def send(self, chat_id: str, content: str,
reply_to: str = None, metadata: dict = None) -> SendResult:
"""Accumulate responses — the last one is the final agent response."""
from gateway.session import build_session_key
# Find the accumulator for this chat_id or session_key
acc = self._response_accumulators.get(chat_id)
evt = self._response_events.get(chat_id)
if acc is None:
source = self.build_source(chat_id=chat_id, chat_type="dm")
sk = build_session_key(source)
acc = self._response_accumulators.get(sk)
evt = self._response_events.get(sk)
if acc is not None:
acc.append(content)
if evt:
evt.set() # signal that we have at least one response
return SendResult(success=True, message_id=str(int(time.time())))
async def send_typing(self, chat_id: str, metadata: dict = None) -> None:
pass # No typing indicator for webhooks
async def get_chat_info(self, chat_id: str) -> dict:
return {"id": chat_id, "name": f"webhook:{chat_id}", "type": "dm"}
# ── HTTP Handlers ────────────────────────────────────────────────────
async def _handle_post(self, request: web.Request) -> web.Response:
"""Accept an inbound message and return the agent's response."""
try:
data = await request.json()
except (json.JSONDecodeError, Exception):
return web.json_response(
{"ok": False, "error": "Invalid JSON"}, status=400
)
chat_id = data.get("chat_id", "").strip()
message = data.get("message", "").strip()
if not chat_id or not message:
return web.json_response(
{"ok": False, "error": "Missing required fields: chat_id, message"},
status=400,
)
from_name = data.get("from", "webhook")
user_id = data.get("user_id", from_name)
# Prepend sender info if provided
display_message = message
if from_name and from_name != "webhook":
display_message = f"[Message from {from_name}]: {message}"
# Build source and event
source = self.build_source(
chat_id=chat_id,
chat_type="dm",
user_id=user_id,
user_name=from_name,
)
from gateway.session import build_session_key
session_key = build_session_key(source)
event = MessageEvent(
text=display_message,
source=source,
message_id=str(int(time.time() * 1000)),
)
# Set up response accumulator — send() appends here, we take the last
response_list: list[str] = []
done_event = asyncio.Event()
self._response_accumulators[session_key] = response_list
self._response_accumulators[chat_id] = response_list
self._response_events[session_key] = done_event
self._response_events[chat_id] = done_event
# Submit the message for processing (spawns background task)
await self.handle_message(event)
# Wait for the agent to finish. The background task in
# _process_message_background removes session_key from
# _active_sessions when done. We poll for that + accumulator.
try:
deadline = asyncio.get_event_loop().time() + 300
while True:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
break
# Wait for at least one send() call
try:
await asyncio.wait_for(done_event.wait(), timeout=min(remaining, 2.0))
except asyncio.TimeoutError:
pass
# Check if the session is done processing
if session_key not in self._active_sessions and response_list:
# Processing finished — give a tiny grace period for
# any final send() calls to arrive
await asyncio.sleep(0.2)
break
if response_list:
# Return the LAST response (the final agent response)
return web.json_response({
"ok": True,
"response": response_list[-1],
"chat_id": chat_id,
})
else:
return web.json_response(
{"ok": False, "error": "Agent timed out (300s)", "chat_id": chat_id},
status=504,
)
finally:
self._response_accumulators.pop(session_key, None)
self._response_accumulators.pop(chat_id, None)
self._response_events.pop(session_key, None)
self._response_events.pop(chat_id, None)
async def _handle_health(self, request: web.Request) -> web.Response:
return web.json_response({
"ok": True,
"adapter": "webhook",
"port": self.port,
})

View File

@@ -793,6 +793,13 @@ class GatewayRunner:
return None
return EmailAdapter(config)
elif platform == Platform.WEBHOOK:
from gateway.platforms.webhook import WebhookAdapter, check_webhook_requirements
if not check_webhook_requirements():
logger.warning("Webhook: WEBHOOK_PORT not set")
return None
return WebhookAdapter(config)
return None
def _is_user_authorized(self, source: SessionSource) -> bool:
@@ -809,7 +816,8 @@ class GatewayRunner:
# Home Assistant events are system-generated (state changes), not
# user-initiated messages. The HASS_TOKEN already authenticates the
# connection, so HA events are always authorized.
if source.platform == Platform.HOMEASSISTANT:
# Webhook messages are local HTTP requests — authorized by default.
if source.platform in (Platform.HOMEASSISTANT, Platform.WEBHOOK):
return True
user_id = source.user_id

View File

@@ -301,7 +301,7 @@ def build_session_key(source: SessionSource) -> str:
This is the single source of truth for session key construction.
DM rules:
- WhatsApp DMs include chat_id (multi-user support).
- WhatsApp and Webhook DMs include chat_id (multi-session support).
- Other DMs include thread_id when present (e.g. Slack threaded DMs),
so each DM thread gets its own session while top-level DMs share one.
- Without thread_id or chat_id, all DMs share a single session.
@@ -314,7 +314,7 @@ def build_session_key(source: SessionSource) -> str:
if source.chat_type == "dm":
if source.thread_id:
return f"agent:main:{platform}:dm:{source.thread_id}"
if platform == "whatsapp" and source.chat_id:
if platform in ("whatsapp", "webhook") and source.chat_id:
return f"agent:main:{platform}:dm:{source.chat_id}"
return f"agent:main:{platform}:dm"
if source.thread_id: