Compare commits

...

6 Commits

Author SHA1 Message Date
Teknium
d1c7fa1907 feat: dynamic toolset generation for plugin platforms
Plugin platforms now get full toolset support without any entries in
toolsets.py.

tools_config._get_platform_tools(): Falls back to 'hermes-<name>'
  when the platform isn't in the static PLATFORMS dict. No more
  KeyError for plugin platforms.

toolsets.resolve_toolset(): Auto-generates a toolset for plugin
  platforms (hermes-<name>) containing _HERMES_CORE_TOOLS plus any
  tools the plugin registered into a matching toolset name. This means
  a plugin can call ctx.register_tool(toolset='irc', ...) and those
  tools will be included in the hermes-irc toolset automatically.

webhook.py: Registry-aware cross-platform delivery.
run_agent.py: Platform hints from plugin registry.
IRC adapter: Token lock + platform hint.
Removed dead token-empty-warning extension.
Updated docs.
2026-04-11 19:36:48 -07:00
Teknium
2a304e5de4 feat: final platform plugin parity — webhook delivery, platform hints, docs
Closes remaining functional gaps and adds documentation.

## Functional fixes

webhook.py: Cross-platform delivery now checks the plugin registry
  for unknown platform names instead of hardcoding 15 names in a tuple.
  Plugin platforms can receive webhook-routed deliveries.

prompt_builder: Platform hints (system prompt LLM guidance) now fall
  back to the plugin registry's platform_hint field. Plugin platforms
  can tell the LLM 'you're on IRC, no markdown.'

PlatformEntry: Added platform_hint field for LLM guidance injection.

IRC adapter: Added acquire_scoped_lock/release_scoped_lock in
  connect/disconnect to prevent two profiles from using the same IRC
  identity. Added platform_hint for IRC-specific LLM guidance.

Removed dead token-empty-warning extension for plugin platforms
  (plugin adapters handle their own env vars via check_fn).

## Documentation

website/docs/developer-guide/adding-platform-adapters.md:
  - Added 'Plugin Path (Recommended)' section with full code examples,
    PLUGIN.yaml template, config.yaml examples, and a table showing all
    18 integration points the plugin system handles automatically
  - Renamed built-in checklist to clarify it's for core contributors

gateway/platforms/ADDING_A_PLATFORM.md:
  - Added Plugin Path section pointing to the reference implementation
    and full docs guide
  - Clarified built-in path is for core contributors only
2026-04-11 19:27:04 -07:00
Teknium
e7fc6450fc Merge remote-tracking branch 'origin/main' into hermes/hermes-1f7bfa9e
# Conflicts:
#	cron/scheduler.py
#	tools/send_message_tool.py
2026-04-11 19:23:02 -07:00
Teknium
195547609a fix: wire PII redaction + token empty warnings for plugin platforms
PII redaction: build_session_context_prompt() now checks the plugin
registry's pii_safe flag in addition to the hardcoded _PII_SAFE_PLATFORMS
frozenset. Plugin platforms that set pii_safe=True (e.g. phone-based
messaging bridges) get their user IDs redacted before LLM context.

Token empty warnings: the empty-token diagnostic at config load now
checks the plugin registry's required_env when a platform isn't in the
hardcoded _token_env_names dict. Catches 'enabled but empty' for
plugin platforms too.
2026-04-11 15:34:59 -07:00
Teknium
5bb8cd132f feat: complete plugin platform parity — all 12 integration points
Extends the platform plugin interface from Phase 1 to cover every
touchpoint where built-in platforms have hardcoded behavior.

## PlatformEntry extended fields
- allowed_users_env / allow_all_env: per-platform auth env vars
- max_message_length: smart-chunking for send_message tool
- pii_safe: session PII redaction flag
- emoji: CLI/gateway display
- allow_update_command: /update access control

## Functional fixes (Tier 1)

send_message tool (tools/send_message_tool.py):
- Replaced hardcoded platform_map dict with Platform() call
- Added _send_via_adapter() for plugin platforms — routes through
  live gateway adapter when available
- Registry-aware max message length for smart chunking

Cron delivery (cron/scheduler.py):
- Replaced hardcoded 15-entry platform_map with Platform() call
- Plugin platforms now work as cron delivery targets

User authorization (gateway/run.py _is_user_authorized):
- Registry fallback: checks PlatformEntry.allowed_users_env and
  allow_all_env when platform not in hardcoded maps
- Plugin platforms get per-platform auth support

## Integration fixes (Tier 2)

_UPDATE_ALLOWED_PLATFORMS: checks registry allow_update_command flag
Channel directory: includes plugin platforms in session enumeration
Orphaned config warning: descriptive message when plugin platform is
  in config but no plugin registered it
Gateway weakref: _gateway_runner_ref for cross-module adapter access

## UX completeness

hermes status: shows plugin platforms with (plugin) tag
hermes gateway setup: plugin platforms appear in menu with setup hints
hermes_cli/platforms.py: get_all_platforms() merges with registry,
  platform_label() falls back to registry for plugin names

## Tests
- 8 new tests (extended fields, cron resolution, platforms merge)
- Updated 3 tests for new Platform() based resolution
- 2829 passed, 24 pre-existing failures, zero new failures
2026-04-11 15:10:03 -07:00
Teknium
e1f4de4dd3 feat: pluggable platform adapter registry + IRC reference implementation
Adds a platform adapter plugin interface so anyone can create new gateway
platforms (IRC, Viber, Line, etc.) as drop-in plugins without modifying
core gateway code.

## Platform Registry (gateway/platform_registry.py)
- PlatformEntry dataclass: name, label, adapter_factory, check_fn,
  validate_config, required_env, install_hint, source
- PlatformRegistry singleton with register/unregister/create_adapter
- _create_adapter() in gateway/run.py checks registry first, falls
  through to existing if/elif chain for built-in platforms

## Dynamic Platform Enum (gateway/config.py)
- Platform._missing_() accepts unknown string values, creating cached
  pseudo-members so Platform('irc') is Platform('irc') holds true
- GatewayConfig.from_dict() now parses plugin platform names from
  config.yaml without rejecting them
- get_connected_platforms() delegates to registry for unknown platforms

## Plugin Registration (hermes_cli/plugins.py)
- PluginContext.register_platform() for plugin authors
- Mirrors the existing register_tool() / register_hook() pattern

## IRC Reference Plugin (plugins/platforms/irc/)
- Full async IRC adapter using stdlib asyncio (zero external deps)
- Connects via TLS, handles PING/PONG, nick collision, NickServ auth
- Channel messages require addressing (nick: msg), DMs always dispatch
- Markdown stripping for IRC-clean output, message splitting for
  512-byte line limit
- Config via config.yaml extra dict or IRC_* env vars

## Tests (55 new tests)
- Platform enum dynamic members (identity stability, case normalization)
- PlatformRegistry (register, unregister, create, validation, factory)
- GatewayConfig integration (from_dict parsing, get_connected_platforms)
- IRC adapter (init, send, protocol parsing, markdown, requirements)

No existing platform adapters were migrated — the if/elif chain is
untouched. This is Phase 1: prove the interface with a real plugin.
2026-04-11 14:25:11 -07:00
25 changed files with 2080 additions and 105 deletions

View File

@@ -222,26 +222,11 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
from tools.send_message_tool import _send_to_platform
from gateway.config import load_gateway_config, Platform
platform_map = {
"telegram": Platform.TELEGRAM,
"discord": Platform.DISCORD,
"slack": Platform.SLACK,
"whatsapp": Platform.WHATSAPP,
"signal": Platform.SIGNAL,
"matrix": Platform.MATRIX,
"mattermost": Platform.MATTERMOST,
"homeassistant": Platform.HOMEASSISTANT,
"dingtalk": Platform.DINGTALK,
"feishu": Platform.FEISHU,
"wecom": Platform.WECOM,
"wecom_callback": Platform.WECOM_CALLBACK,
"weixin": Platform.WEIXIN,
"email": Platform.EMAIL,
"sms": Platform.SMS,
"bluebubbles": Platform.BLUEBUBBLES,
}
platform = platform_map.get(platform_name.lower())
if not platform:
# Accept any platform name — built-in names resolve to their enum
# member, plugin platform names create dynamic members via _missing_().
try:
platform = Platform(platform_name.lower())
except (ValueError, KeyError):
msg = f"unknown platform '{platform_name}'"
logger.warning("Job '%s': %s", job["id"], msg)
return msg

View File

@@ -86,6 +86,16 @@ def build_channel_directory(adapters: Dict[Any, Any]) -> Dict[str, Any]:
continue
platforms[plat_name] = _build_from_sessions(plat_name)
# Include plugin-registered platforms (dynamic enum members aren't in
# Platform.__members__, so the loop above misses them).
try:
from gateway.platform_registry import platform_registry
for entry in platform_registry.plugin_entries():
if entry.name not in _SKIP_SESSION_DISCOVERY and entry.name not in platforms:
platforms[entry.name] = _build_from_sessions(entry.name)
except Exception:
pass
directory = {
"updated_at": datetime.now().isoformat(),
"platforms": platforms,

View File

@@ -46,7 +46,13 @@ def _normalize_unauthorized_dm_behavior(value: Any, default: str = "pair") -> st
class Platform(Enum):
"""Supported messaging platforms."""
"""Supported messaging platforms.
Built-in platforms have explicit members. Plugin platforms use dynamic
members created on-demand by ``_missing_()`` so that
``Platform("irc")`` works without modifying this enum. Dynamic members
are cached in ``_value2member_map_`` for identity-stable comparisons.
"""
LOCAL = "local"
TELEGRAM = "telegram"
DISCORD = "discord"
@@ -67,6 +73,28 @@ class Platform(Enum):
WEIXIN = "weixin"
BLUEBUBBLES = "bluebubbles"
@classmethod
def _missing_(cls, value):
"""Accept unknown platform names for plugin-registered adapters.
Creates a pseudo-member cached in ``_value2member_map_`` so that
``Platform("irc") is Platform("irc")`` holds True (identity-stable).
"""
if not isinstance(value, str) or not value.strip():
return None
# Normalise to lowercase to avoid case mismatches in config
value = value.strip().lower()
# Check cache first (another call may have created it already)
if value in cls._value2member_map_:
return cls._value2member_map_[value]
pseudo = object.__new__(cls)
pseudo._value_ = value
pseudo._name_ = value.upper().replace("-", "_").replace(" ", "_")
# Cache so future lookups return the same object
cls._value2member_map_[value] = pseudo
cls._member_map_[pseudo._name_] = pseudo
return pseudo
@dataclass
class HomeChannel:
@@ -303,6 +331,17 @@ class GatewayConfig:
# BlueBubbles uses extra dict for local server config
elif platform == Platform.BLUEBUBBLES and config.extra.get("server_url") and config.extra.get("password"):
connected.append(platform)
else:
# Plugin-registered platform — delegate validation to the
# registry entry's validate_config if available.
try:
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform.value)
if entry:
if entry.validate_config is None or entry.validate_config(config):
connected.append(platform)
except Exception:
pass # Registry not yet initialised during early import
return connected
def get_home_channel(self, platform: Platform) -> Optional[HomeChannel]:

View File

@@ -0,0 +1,196 @@
"""
Platform Adapter Registry
Allows platform adapters (built-in and plugin) to self-register so the gateway
can discover and instantiate them without hardcoded if/elif chains.
Built-in adapters continue to use the existing if/elif in _create_adapter()
for now. Plugin adapters register here via PluginContext.register_platform()
and are looked up first -- if nothing is found the gateway falls through to
the legacy code path.
Usage (plugin side):
from gateway.platform_registry import platform_registry, PlatformEntry
platform_registry.register(PlatformEntry(
name="irc",
label="IRC",
adapter_factory=lambda cfg: IRCAdapter(cfg),
check_fn=check_requirements,
validate_config=lambda cfg: bool(cfg.extra.get("server")),
required_env=["IRC_SERVER"],
install_hint="pip install irc",
))
Usage (gateway side):
adapter = platform_registry.create_adapter("irc", platform_config)
"""
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
logger = logging.getLogger(__name__)
@dataclass
class PlatformEntry:
"""Metadata and factory for a single platform adapter."""
# Identifier used in config.yaml (e.g. "irc", "viber").
name: str
# Human-readable label (e.g. "IRC", "Viber").
label: str
# Factory callable: receives a PlatformConfig, returns an adapter instance.
# Using a factory instead of a bare class lets plugins do custom init
# (e.g. passing extra kwargs, wrapping in try/except).
adapter_factory: Callable[[Any], Any]
# Returns True when the platform's dependencies are available.
check_fn: Callable[[], bool]
# Optional: given a PlatformConfig, is it properly configured?
# If None, the registry skips config validation and lets the adapter
# fail at connect() time with a descriptive error.
validate_config: Optional[Callable[[Any], bool]] = None
# Env vars this platform needs (for ``hermes setup`` display).
required_env: list = field(default_factory=list)
# Hint shown when check_fn returns False.
install_hint: str = ""
# "builtin" or "plugin"
source: str = "plugin"
# ── Auth env var names (for _is_user_authorized integration) ──
# E.g. "IRC_ALLOWED_USERS" — checked for comma-separated user IDs.
allowed_users_env: str = ""
# E.g. "IRC_ALLOW_ALL_USERS" — if truthy, all users authorized.
allow_all_env: str = ""
# ── Message limits ──
# Max message length for smart-chunking. 0 = no limit.
max_message_length: int = 0
# ── Privacy ──
# If True, session descriptions redact PII (phone numbers, etc.)
pii_safe: bool = False
# ── Display ──
# Emoji for CLI/gateway display (e.g. "💬")
emoji: str = "🔌"
# Whether this platform should appear in _UPDATE_ALLOWED_PLATFORMS
# (allows /update command from this platform).
allow_update_command: bool = True
# ── LLM guidance ──
# Platform hint injected into the system prompt (e.g. "You are on IRC.
# Do not use markdown."). Empty string = no hint.
platform_hint: str = ""
class PlatformRegistry:
"""Central registry of platform adapters.
Thread-safe for reads (dict lookups are atomic under GIL).
Writes happen at startup during sequential discovery.
"""
def __init__(self) -> None:
self._entries: dict[str, PlatformEntry] = {}
def register(self, entry: PlatformEntry) -> None:
"""Register a platform adapter entry.
If an entry with the same name exists, it is replaced (last writer
wins -- this lets plugins override built-in adapters if desired).
"""
if entry.name in self._entries:
prev = self._entries[entry.name]
logger.info(
"Platform '%s' re-registered (was %s, now %s)",
entry.name,
prev.source,
entry.source,
)
self._entries[entry.name] = entry
logger.debug("Registered platform adapter: %s (%s)", entry.name, entry.source)
def unregister(self, name: str) -> bool:
"""Remove a platform entry. Returns True if it existed."""
return self._entries.pop(name, None) is not None
def get(self, name: str) -> Optional[PlatformEntry]:
"""Look up a platform entry by name."""
return self._entries.get(name)
def all_entries(self) -> list[PlatformEntry]:
"""Return all registered platform entries."""
return list(self._entries.values())
def plugin_entries(self) -> list[PlatformEntry]:
"""Return only plugin-registered platform entries."""
return [e for e in self._entries.values() if e.source == "plugin"]
def is_registered(self, name: str) -> bool:
return name in self._entries
def create_adapter(self, name: str, config: Any) -> Optional[Any]:
"""Create an adapter instance for the given platform name.
Returns None if:
- No entry registered for *name*
- check_fn() returns False (missing deps)
- validate_config() returns False (misconfigured)
- The factory raises an exception
"""
entry = self._entries.get(name)
if entry is None:
return None
if not entry.check_fn():
hint = f" ({entry.install_hint})" if entry.install_hint else ""
logger.warning(
"Platform '%s' requirements not met%s",
entry.label,
hint,
)
return None
if entry.validate_config is not None:
try:
if not entry.validate_config(config):
logger.warning(
"Platform '%s' config validation failed",
entry.label,
)
return None
except Exception as e:
logger.warning(
"Platform '%s' config validation error: %s",
entry.label,
e,
)
return None
try:
adapter = entry.adapter_factory(config)
return adapter
except Exception as e:
logger.error(
"Failed to create adapter for platform '%s': %s",
entry.label,
e,
exc_info=True,
)
return None
# Module-level singleton
platform_registry = PlatformRegistry()

View File

@@ -1,9 +1,30 @@
# Adding a New Messaging Platform
Checklist for integrating a new messaging platform into the Hermes gateway.
Use this as a reference when building a new adapter — every item here is a
real integration point that exists in the codebase. Missing any of them will
cause broken functionality, missing features, or inconsistent behavior.
There are two ways to add a platform to the Hermes gateway:
## Plugin Path (Recommended for Community/Third-Party)
Create a plugin directory in `~/.hermes/plugins/` with a `PLUGIN.yaml` and
`adapter.py`. The adapter inherits from `BasePlatformAdapter` and registers
via `ctx.register_platform()` in the `register(ctx)` entry point. This
requires **zero changes to core Hermes code**.
The plugin system automatically handles: adapter creation, config parsing,
user authorization, cron delivery, send_message routing, system prompt hints,
status display, gateway setup, and more.
See `plugins/platforms/irc/` for a complete reference implementation, and
`website/docs/developer-guide/adding-platform-adapters.md` for the full
plugin guide with code examples.
---
## Built-in Path (Core Contributors Only)
Checklist for integrating a platform directly into the Hermes core.
Use this as a reference when building a built-in adapter — every item here
is a real integration point. Missing any of them will cause broken
functionality, missing features, or inconsistent behavior.
---

View File

@@ -186,25 +186,21 @@ class WebhookAdapter(BasePlatformAdapter):
if deliver_type == "github_comment":
return await self._deliver_github_comment(content, delivery)
# Cross-platform delivery — any platform with a gateway adapter
if self.gateway_runner and deliver_type in (
"telegram",
"discord",
"slack",
"signal",
"sms",
"whatsapp",
"matrix",
"mattermost",
"homeassistant",
"email",
"dingtalk",
"feishu",
"wecom",
"wecom_callback",
"weixin",
"bluebubbles",
):
# Cross-platform delivery — any platform with a gateway adapter.
# Check both built-in names and plugin-registered platforms.
_BUILTIN_DELIVER_PLATFORMS = {
"telegram", "discord", "slack", "signal", "sms", "whatsapp",
"matrix", "mattermost", "homeassistant", "email", "dingtalk",
"feishu", "wecom", "wecom_callback", "weixin", "bluebubbles",
}
_is_known_platform = deliver_type in _BUILTIN_DELIVER_PLATFORMS
if not _is_known_platform:
try:
from gateway.platform_registry import platform_registry
_is_known_platform = platform_registry.is_registered(deliver_type)
except Exception:
pass
if self.gateway_runner and _is_known_platform:
return await self._deliver_cross_platform(
deliver_type, content, delivery
)

View File

@@ -498,6 +498,13 @@ def _format_gateway_process_notification(evt: dict) -> "str | None":
return None
# Module-level weak reference to the active GatewayRunner instance.
# Used by tools (e.g. send_message) that need to route through a live
# adapter for plugin platforms. Set in GatewayRunner.__init__().
import weakref as _weakref
_gateway_runner_ref: _weakref.ref = lambda: None
class GatewayRunner:
"""
Main gateway controller.
@@ -521,8 +528,10 @@ class GatewayRunner:
_session_model_overrides: Dict[str, Dict[str, str]] = {}
def __init__(self, config: Optional[GatewayConfig] = None):
global _gateway_runner_ref
self.config = config or load_gateway_config()
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
_gateway_runner_ref = _weakref.ref(self)
# Load ephemeral config from config.yaml / env vars.
# Both are injected at API-call time only and never persisted.
@@ -1512,7 +1521,17 @@ class GatewayRunner:
adapter = self._create_adapter(platform, platform_config)
if not adapter:
logger.warning("No adapter available for %s", platform.value)
# Distinguish between missing builtin deps and missing plugin
_pval = platform.value
_builtin_names = {m.value for m in Platform.__members__.values()}
if _pval not in _builtin_names:
logger.warning(
"No adapter for '%s' — is the plugin installed? "
"(platform is enabled in config.yaml but no plugin registered it)",
_pval,
)
else:
logger.warning("No adapter available for %s", _pval)
continue
# Set up message + fatal error handlers
@@ -2043,7 +2062,11 @@ class GatewayRunner:
platform: Platform,
config: Any
) -> Optional[BasePlatformAdapter]:
"""Create the appropriate adapter for a platform."""
"""Create the appropriate adapter for a platform.
Checks the platform_registry first (plugin adapters), then falls
through to the built-in if/elif chain for core platforms.
"""
if hasattr(config, "extra") and isinstance(config.extra, dict):
config.extra.setdefault(
"group_sessions_per_user",
@@ -2054,6 +2077,16 @@ class GatewayRunner:
getattr(self.config, "thread_sessions_per_user", False),
)
# ── Plugin-registered platforms (checked first) ──────────────
try:
from gateway.platform_registry import platform_registry
adapter = platform_registry.create_adapter(platform.value, config)
if adapter is not None:
return adapter
except Exception as e:
logger.debug("Platform registry lookup for '%s' failed: %s", platform.value, e)
# Fall through to built-in adapters below
if platform == Platform.TELEGRAM:
from gateway.platforms.telegram import TelegramAdapter, check_telegram_requirements
if not check_telegram_requirements():
@@ -2245,6 +2278,19 @@ class GatewayRunner:
Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOW_ALL_USERS",
}
# Plugin platforms: check the registry for auth env var names
if source.platform not in platform_env_map:
try:
from gateway.platform_registry import platform_registry
entry = platform_registry.get(source.platform.value)
if entry:
if entry.allowed_users_env:
platform_env_map[source.platform] = entry.allowed_users_env
if entry.allow_all_env:
platform_allow_all_map[source.platform] = entry.allow_all_env
except Exception:
pass
# Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true)
platform_allow_all_var = platform_allow_all_map.get(source.platform, "")
if platform_allow_all_var and os.getenv(platform_allow_all_var, "").lower() in ("true", "1", "yes"):
@@ -6350,8 +6396,16 @@ class GatewayRunner:
# Block non-messaging platforms (API server, webhooks, ACP)
platform = event.source.platform
if platform not in self._UPDATE_ALLOWED_PLATFORMS:
return "✗ /update is only available from messaging platforms. Run `hermes update` from the terminal."
_allowed = self._UPDATE_ALLOWED_PLATFORMS
# Plugin platforms with allow_update_command=True are also allowed
if platform not in _allowed:
try:
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform.value)
if not entry or not entry.allow_update_command:
return "✗ /update is only available from messaging platforms. Run `hermes update` from the terminal."
except Exception:
return "✗ /update is only available from messaging platforms. Run `hermes update` from the terminal."
if is_managed():
return f"{format_managed_message('update Hermes Agent')}"

View File

@@ -203,8 +203,18 @@ def build_session_context_prompt(
Platforms like Discord are excluded because mentions need real IDs.
Routing still uses the original values (they stay in SessionSource).
"""
# Only apply redaction on platforms where IDs aren't needed for mentions
redact_pii = redact_pii and context.source.platform in _PII_SAFE_PLATFORMS
# Only apply redaction on platforms where IDs aren't needed for mentions.
# Check both the hardcoded set (builtins) and the plugin registry.
_is_pii_safe = context.source.platform in _PII_SAFE_PLATFORMS
if not _is_pii_safe:
try:
from gateway.platform_registry import platform_registry
entry = platform_registry.get(context.source.platform.value)
if entry and entry.pii_safe:
_is_pii_safe = True
except Exception:
pass
redact_pii = redact_pii and _is_pii_safe
lines = [
"## Current Session Context",
"",

View File

@@ -45,6 +45,8 @@ _EXTRA_ENV_KEYS = frozenset({
"WEIXIN_HOME_CHANNEL", "WEIXIN_HOME_CHANNEL_NAME", "WEIXIN_DM_POLICY", "WEIXIN_GROUP_POLICY",
"WEIXIN_ALLOWED_USERS", "WEIXIN_GROUP_ALLOWED_USERS", "WEIXIN_ALLOW_ALL_USERS",
"BLUEBUBBLES_SERVER_URL", "BLUEBUBBLES_PASSWORD",
"IRC_SERVER", "IRC_PORT", "IRC_NICKNAME", "IRC_CHANNEL",
"IRC_USE_TLS", "IRC_SERVER_PASSWORD", "IRC_NICKSERV_PASSWORD",
"TERMINAL_ENV", "TERMINAL_SSH_KEY", "TERMINAL_SSH_PORT",
"WHATSAPP_MODE", "WHATSAPP_ENABLED",
"MATTERMOST_HOME_CHANNEL", "MATTERMOST_REPLY_MODE",
@@ -1306,6 +1308,43 @@ OPTIONAL_ENV_VARS = {
"password": False,
"category": "messaging",
},
"IRC_SERVER": {
"description": "IRC server hostname (e.g. irc.libera.chat)",
"prompt": "IRC server",
"url": None,
"password": False,
"category": "messaging",
},
"IRC_CHANNEL": {
"description": "IRC channel to join (e.g. #hermes)",
"prompt": "IRC channel",
"url": None,
"password": False,
"category": "messaging",
},
"IRC_NICKNAME": {
"description": "Bot nickname on IRC (default: hermes-bot)",
"prompt": "IRC nickname",
"url": None,
"password": False,
"category": "messaging",
},
"IRC_SERVER_PASSWORD": {
"description": "IRC server password (if required)",
"prompt": "IRC server password",
"url": None,
"password": True,
"category": "messaging",
"advanced": True,
},
"IRC_NICKSERV_PASSWORD": {
"description": "NickServ password for nick identification",
"prompt": "NickServ password",
"url": None,
"password": True,
"category": "messaging",
"advanced": True,
},
"GATEWAY_ALLOW_ALL_USERS": {
"description": "Allow all users to interact with messaging bots (true/false). Default: false.",
"prompt": "Allow all users (true/false)",

View File

@@ -2448,27 +2448,53 @@ def gateway_setup():
print()
print_header("Messaging Platforms")
# Build menu from built-in platforms + plugin platforms
_plugin_entries = []
try:
from gateway.platform_registry import platform_registry
_plugin_entries = platform_registry.plugin_entries()
except Exception:
pass
menu_items = []
for plat in _PLATFORMS:
status = _platform_status(plat)
menu_items.append(f"{plat['label']} ({status})")
for pentry in _plugin_entries:
configured = pentry.check_fn()
status_str = "configured" if configured else "not configured"
menu_items.append(f"{pentry.emoji} {pentry.label} ({status_str}) [plugin]")
menu_items.append("Done")
_total_platforms = len(_PLATFORMS) + len(_plugin_entries)
choice = prompt_choice("Select a platform to configure:", menu_items, len(menu_items) - 1)
if choice == len(_PLATFORMS):
if choice == _total_platforms:
break
platform = _PLATFORMS[choice]
if choice < len(_PLATFORMS):
platform = _PLATFORMS[choice]
if platform["key"] == "whatsapp":
_setup_whatsapp()
elif platform["key"] == "signal":
_setup_signal()
elif platform["key"] == "weixin":
_setup_weixin()
if platform["key"] == "whatsapp":
_setup_whatsapp()
elif platform["key"] == "signal":
_setup_signal()
elif platform["key"] == "weixin":
_setup_weixin()
else:
_setup_standard_platform(platform)
else:
_setup_standard_platform(platform)
# Plugin platform — show env var setup instructions
pentry = _plugin_entries[choice - len(_PLATFORMS)]
print(f"\n {pentry.label} (plugin platform)")
if pentry.required_env:
print(f" Required env vars: {', '.join(pentry.required_env)}")
print(f" Set these in ~/.hermes/.env or config.yaml gateway.platforms.{pentry.name}.extra")
else:
print(f" Configure in config.yaml under gateway.platforms.{pentry.name}")
if pentry.install_hint:
print(f" {pentry.install_hint}")
print()
# ── Post-setup: offer to install/restart gateway ──
any_configured = any(

View File

@@ -41,6 +41,40 @@ PLATFORMS: OrderedDict[str, PlatformInfo] = OrderedDict([
def platform_label(key: str, default: str = "") -> str:
"""Return the display label for a platform key, or *default*."""
"""Return the display label for a platform key, or *default*.
Checks the static PLATFORMS dict first, then the plugin platform
registry for dynamically registered platforms.
"""
info = PLATFORMS.get(key)
return info.label if info is not None else default
if info is not None:
return info.label
# Check plugin registry
try:
from gateway.platform_registry import platform_registry
entry = platform_registry.get(key)
if entry:
return f"{entry.emoji} {entry.label}" if entry.emoji else entry.label
except Exception:
pass
return default
def get_all_platforms() -> "OrderedDict[str, PlatformInfo]":
"""Return PLATFORMS merged with any plugin-registered platforms.
Plugin platforms are appended after builtins. This is the function
that tools_config and skills_config should use for platform menus.
"""
merged = OrderedDict(PLATFORMS)
try:
from gateway.platform_registry import platform_registry
for entry in platform_registry.plugin_entries():
if entry.name not in merged:
merged[entry.name] = PlatformInfo(
label=f"{entry.emoji} {entry.label}" if entry.emoji else entry.label,
default_toolset=f"hermes-{entry.name}",
)
except Exception:
pass
return merged

View File

@@ -244,6 +244,53 @@ class PluginContext:
self.manifest.name, engine.name,
)
# -- platform adapter registration ---------------------------------------
def register_platform(
self,
name: str,
label: str,
adapter_factory: Callable,
check_fn: Callable,
validate_config: Callable | None = None,
required_env: list | None = None,
install_hint: str = "",
) -> None:
"""Register a gateway platform adapter.
The adapter_factory receives a ``PlatformConfig`` and returns a
``BasePlatformAdapter`` subclass instance. The gateway calls
``check_fn()`` before instantiation to verify dependencies.
Example::
ctx.register_platform(
name="irc",
label="IRC",
adapter_factory=lambda cfg: IRCAdapter(cfg),
check_fn=lambda: True,
)
"""
from gateway.platform_registry import platform_registry, PlatformEntry
entry = PlatformEntry(
name=name,
label=label,
adapter_factory=adapter_factory,
check_fn=check_fn,
validate_config=validate_config,
required_env=required_env or [],
install_hint=install_hint,
source="plugin",
)
platform_registry.register(entry)
self._manager._plugin_platform_names.add(name)
logger.debug(
"Plugin %s registered platform: %s",
self.manifest.name,
name,
)
# -- hook registration --------------------------------------------------
def register_hook(self, hook_name: str, callback: Callable) -> None:
@@ -275,6 +322,7 @@ class PluginManager:
self._plugins: Dict[str, LoadedPlugin] = {}
self._hooks: Dict[str, List[Callable]] = {}
self._plugin_tool_names: Set[str] = set()
self._plugin_platform_names: Set[str] = set()
self._cli_commands: Dict[str, dict] = {}
self._context_engine = None # Set by a plugin via register_context_engine()
self._discovered: bool = False

View File

@@ -320,6 +320,17 @@ def show_status(args):
status += f" (home: {home_channel})"
print(f" {name:<12} {check_mark(has_token)} {status}")
# Plugin-registered platforms
try:
from gateway.platform_registry import platform_registry
for entry in platform_registry.plugin_entries():
configured = entry.check_fn()
status_str = "configured" if configured else "not configured"
label = entry.label
print(f" {label:<12} {check_mark(configured)} {status_str} (plugin)")
except Exception:
pass
# =========================================================================
# Gateway Status

View File

@@ -475,7 +475,12 @@ def _get_platform_tools(
toolset_names = platform_toolsets.get(platform)
if toolset_names is None or not isinstance(toolset_names, list):
default_ts = PLATFORMS[platform]["default_toolset"]
plat_info = PLATFORMS.get(platform)
if plat_info:
default_ts = plat_info["default_toolset"]
else:
# Plugin platform — derive toolset name from platform key
default_ts = f"hermes-{platform}"
toolset_names = [default_ts]
# YAML may parse bare numeric names (e.g. ``12306:``) as int.

View File

@@ -0,0 +1,12 @@
name: irc-platform
version: 1.0.0
description: >
IRC gateway adapter for Hermes Agent.
Connects to an IRC server and relays messages between an IRC channel
(or DMs) and the Hermes agent. No external dependencies — uses
Python's stdlib asyncio for the IRC protocol.
author: Nous Research
requires_env:
- IRC_SERVER
- IRC_CHANNEL
- IRC_NICKNAME

View File

@@ -0,0 +1,530 @@
"""
IRC Platform Adapter for Hermes Agent.
A plugin-based gateway adapter that connects to an IRC server and relays
messages to/from the Hermes agent. Zero external dependencies — uses
Python's stdlib asyncio for the IRC protocol.
Configuration in config.yaml::
gateway:
platforms:
irc:
enabled: true
extra:
server: irc.libera.chat
port: 6697
nickname: hermes-bot
channel: "#hermes"
use_tls: true
server_password: "" # optional server password
nickserv_password: "" # optional NickServ identification
allowed_users: [] # empty = allow all, or list of nicks
max_message_length: 450 # IRC line limit (safe default)
Or via environment variables (overrides config.yaml):
IRC_SERVER, IRC_PORT, IRC_NICKNAME, IRC_CHANNEL, IRC_USE_TLS,
IRC_SERVER_PASSWORD, IRC_NICKSERV_PASSWORD
"""
import asyncio
import logging
import os
import re
import ssl
import time
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Lazy import: BasePlatformAdapter and friends live in the main repo.
# We import at function/class level to avoid import errors when the plugin
# is discovered but the gateway hasn't been fully initialised yet.
# ---------------------------------------------------------------------------
from gateway.platforms.base import (
BasePlatformAdapter,
SendResult,
MessageEvent,
MessageType,
)
from gateway.session import SessionSource
from gateway.config import PlatformConfig, Platform
def _ensure_imports():
"""No-op — kept for backward compatibility with any call sites."""
pass
# ---------------------------------------------------------------------------
# IRC protocol helpers
# ---------------------------------------------------------------------------
def _parse_irc_message(raw: str) -> dict:
"""Parse a raw IRC protocol line into components.
Returns dict with keys: prefix, command, params.
"""
prefix = ""
trailing = ""
if raw.startswith(":"):
prefix, raw = raw[1:].split(" ", 1)
if " :" in raw:
raw, trailing = raw.split(" :", 1)
parts = raw.split()
command = parts[0] if parts else ""
params = parts[1:] if len(parts) > 1 else []
if trailing:
params.append(trailing)
return {"prefix": prefix, "command": command, "params": params}
def _extract_nick(prefix: str) -> str:
"""Extract nickname from IRC prefix (nick!user@host)."""
return prefix.split("!")[0] if "!" in prefix else prefix
# ---------------------------------------------------------------------------
# IRC Adapter
# ---------------------------------------------------------------------------
class IRCAdapter(BasePlatformAdapter):
"""Async IRC adapter implementing the BasePlatformAdapter interface.
This class is instantiated by the adapter_factory passed to
register_platform().
"""
def __init__(self, config, **kwargs):
platform = Platform("irc")
super().__init__(config=config, platform=platform)
extra = getattr(config, "extra", {}) or {}
# Connection settings (env vars override config.yaml)
self.server = os.getenv("IRC_SERVER") or extra.get("server", "")
self.port = int(os.getenv("IRC_PORT") or extra.get("port", 6697))
self.nickname = os.getenv("IRC_NICKNAME") or extra.get("nickname", "hermes-bot")
self.channel = os.getenv("IRC_CHANNEL") or extra.get("channel", "")
self.use_tls = (
os.getenv("IRC_USE_TLS", "").lower() in ("1", "true", "yes")
if os.getenv("IRC_USE_TLS")
else extra.get("use_tls", True)
)
self.server_password = os.getenv("IRC_SERVER_PASSWORD") or extra.get("server_password", "")
self.nickserv_password = os.getenv("IRC_NICKSERV_PASSWORD") or extra.get("nickserv_password", "")
# Auth
self.allowed_users: list = extra.get("allowed_users", [])
# IRC limits
self.max_message_length = int(extra.get("max_message_length", 450))
# Runtime state
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self._recv_task: Optional[asyncio.Task] = None
self._current_nick = self.nickname
self._registered = False # IRC registration complete
self._registration_event = asyncio.Event()
@property
def name(self) -> str:
return "IRC"
# ── Connection lifecycle ──────────────────────────────────────────────
async def connect(self) -> bool:
"""Connect to the IRC server, register, and join the channel."""
if not self.server or not self.channel:
logger.error("IRC: server and channel must be configured")
self._set_fatal_error(
"config_missing",
"IRC_SERVER and IRC_CHANNEL must be set",
retryable=False,
)
return False
# Prevent two profiles from using the same IRC identity
try:
from gateway.status import acquire_scoped_lock, release_scoped_lock
lock_key = f"{self.server}:{self.nickname}"
if not acquire_scoped_lock("irc", lock_key):
logger.error("IRC: %s@%s already in use by another profile", self.nickname, self.server)
self._set_fatal_error("lock_conflict", "IRC identity in use by another profile", retryable=False)
return False
self._lock_key = lock_key
except ImportError:
self._lock_key = None # status module not available (e.g. tests)
try:
ssl_ctx = None
if self.use_tls:
ssl_ctx = ssl.create_default_context()
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.server, self.port, ssl=ssl_ctx),
timeout=30.0,
)
except Exception as e:
logger.error("IRC: failed to connect to %s:%s%s", self.server, self.port, e)
self._set_fatal_error("connect_failed", str(e), retryable=True)
return False
# IRC registration sequence
if self.server_password:
await self._send_raw(f"PASS {self.server_password}")
await self._send_raw(f"NICK {self.nickname}")
await self._send_raw(f"USER {self.nickname} 0 * :Hermes Agent")
# Start receive loop
self._recv_task = asyncio.create_task(self._receive_loop())
# Wait for registration (001 RPL_WELCOME) with timeout
try:
await asyncio.wait_for(self._registration_event.wait(), timeout=30.0)
except asyncio.TimeoutError:
logger.error("IRC: registration timed out")
await self.disconnect()
self._set_fatal_error("registration_timeout", "IRC server did not send RPL_WELCOME", retryable=True)
return False
# NickServ identification
if self.nickserv_password:
await self._send_raw(f"PRIVMSG NickServ :IDENTIFY {self.nickserv_password}")
await asyncio.sleep(2) # Give NickServ time to process
# Join channel
await self._send_raw(f"JOIN {self.channel}")
self._mark_connected()
logger.info("IRC: connected to %s:%s as %s, joined %s", self.server, self.port, self._current_nick, self.channel)
return True
async def disconnect(self) -> None:
"""Quit and close the connection."""
# Release the scoped lock so another profile can use this identity
if getattr(self, "_lock_key", None):
try:
from gateway.status import release_scoped_lock
release_scoped_lock("irc", self._lock_key)
except Exception:
pass
self._mark_disconnected()
if self._writer and not self._writer.is_closing():
try:
await self._send_raw("QUIT :Hermes Agent shutting down")
await asyncio.sleep(0.5)
except Exception:
pass
try:
self._writer.close()
await self._writer.wait_closed()
except Exception:
pass
if self._recv_task and not self._recv_task.done():
self._recv_task.cancel()
try:
await self._recv_task
except asyncio.CancelledError:
pass
self._reader = None
self._writer = None
self._registered = False
self._registration_event.clear()
# ── Sending ───────────────────────────────────────────────────────────
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
):
if not self._writer or self._writer.is_closing():
return SendResult(success=False, error="Not connected")
target = chat_id # channel name or nick for DMs
lines = self._split_message(content, target)
for line in lines:
try:
await self._send_raw(f"PRIVMSG {target} :{line}")
# Basic rate limiting to avoid excess flood
await asyncio.sleep(0.3)
except Exception as e:
return SendResult(success=False, error=str(e))
return SendResult(success=True, message_id=str(int(time.time() * 1000)))
async def send_typing(self, chat_id: str, metadata=None) -> None:
"""IRC has no typing indicator — no-op."""
pass
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
is_channel = chat_id.startswith("#") or chat_id.startswith("&")
return {
"name": chat_id,
"type": "group" if is_channel else "dm",
}
# ── Message splitting ─────────────────────────────────────────────────
def _split_message(self, content: str, target: str) -> List[str]:
"""Split a long message into IRC-safe chunks.
IRC has a ~512 byte line limit. After accounting for protocol
overhead (``PRIVMSG <target> :``), we split content into chunks.
"""
# Strip markdown formatting that doesn't render in IRC
content = self._strip_markdown(content)
overhead = len(f"PRIVMSG {target} :".encode("utf-8")) + 2 # +2 for \r\n
max_bytes = 510 - overhead
max_chars = min(self.max_message_length, max_bytes)
lines: List[str] = []
for paragraph in content.split("\n"):
if not paragraph.strip():
continue
while len(paragraph) > max_chars:
# Find a space to break at
split_at = paragraph.rfind(" ", 0, max_chars)
if split_at < max_chars // 3:
split_at = max_chars
lines.append(paragraph[:split_at])
paragraph = paragraph[split_at:].lstrip()
if paragraph.strip():
lines.append(paragraph)
return lines if lines else [""]
@staticmethod
def _strip_markdown(text: str) -> str:
"""Convert basic markdown to plain text for IRC."""
# Bold: **text** or __text__ → text
text = re.sub(r"\*\*(.+?)\*\*", r"\1", text)
text = re.sub(r"__(.+?)__", r"\1", text)
# Italic: *text* or _text_ → text
text = re.sub(r"\*(.+?)\*", r"\1", text)
text = re.sub(r"(?<!\w)_(.+?)_(?!\w)", r"\1", text)
# Inline code: `text` → text
text = re.sub(r"`(.+?)`", r"\1", text)
# Code blocks: ```...``` → content
text = re.sub(r"```\w*\n?", "", text)
# Images: ![alt](url) → url (must come BEFORE links)
text = re.sub(r"!\[([^\]]*)\]\(([^)]+)\)", r"\2", text)
# Links: [text](url) → text (url)
text = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r"\1 (\2)", text)
return text
# ── Raw IRC I/O ──────────────────────────────────────────────────────
async def _send_raw(self, line: str) -> None:
"""Send a raw IRC protocol line."""
if not self._writer or self._writer.is_closing():
return
encoded = (line + "\r\n").encode("utf-8")
self._writer.write(encoded)
await self._writer.drain()
async def _receive_loop(self) -> None:
"""Main receive loop — reads lines and dispatches them."""
buffer = b""
try:
while self._reader and not self._reader.at_eof():
data = await self._reader.read(4096)
if not data:
break
buffer += data
while b"\r\n" in buffer:
line, buffer = buffer.split(b"\r\n", 1)
try:
decoded = line.decode("utf-8", errors="replace")
await self._handle_line(decoded)
except Exception as e:
logger.warning("IRC: error handling line: %s", e)
except asyncio.CancelledError:
raise
except Exception as e:
logger.error("IRC: receive loop error: %s", e)
finally:
if self.is_connected:
logger.warning("IRC: connection lost, marking disconnected")
self._set_fatal_error("connection_lost", "IRC connection closed unexpectedly", retryable=True)
await self._notify_fatal_error()
async def _handle_line(self, raw: str) -> None:
"""Dispatch a single IRC protocol line."""
msg = _parse_irc_message(raw)
command = msg["command"]
params = msg["params"]
# PING/PONG keepalive
if command == "PING":
payload = params[0] if params else ""
await self._send_raw(f"PONG :{payload}")
return
# RPL_WELCOME (001) — registration complete
if command == "001":
self._registered = True
self._registration_event.set()
if params:
# Server may confirm our nick in the first param
self._current_nick = params[0]
return
# ERR_NICKNAMEINUSE (433) — nick collision during registration
if command == "433":
self._current_nick = self.nickname + "_"
await self._send_raw(f"NICK {self._current_nick}")
return
# PRIVMSG — incoming message (channel or DM)
if command == "PRIVMSG" and len(params) >= 2:
sender_nick = _extract_nick(msg["prefix"])
target = params[0]
text = params[1]
# Ignore our own messages
if sender_nick.lower() == self._current_nick.lower():
return
# CTCP ACTION (/me) — convert to text
if text.startswith("\x01ACTION ") and text.endswith("\x01"):
text = f"* {sender_nick} {text[8:-1]}"
# Ignore other CTCP
if text.startswith("\x01"):
return
# Determine if this is a channel message or DM
is_channel = target.startswith("#") or target.startswith("&")
chat_id = target if is_channel else sender_nick
chat_type = "group" if is_channel else "dm"
# In channels, only respond if addressed (nick: or nick,)
if is_channel:
addressed = False
for prefix in (f"{self._current_nick}:", f"{self._current_nick},",
f"{self._current_nick} "):
if text.lower().startswith(prefix.lower()):
text = text[len(prefix):].strip()
addressed = True
break
if not addressed:
return # Ignore unaddressed channel messages
# Auth check
if self.allowed_users and sender_nick not in self.allowed_users:
logger.debug("IRC: ignoring message from unauthorized user %s", sender_nick)
return
await self._dispatch_message(
text=text,
chat_id=chat_id,
chat_type=chat_type,
user_id=sender_nick,
user_name=sender_nick,
)
# NICK — track our own nick changes
if command == "NICK" and _extract_nick(msg["prefix"]).lower() == self._current_nick.lower():
if params:
self._current_nick = params[0]
async def _dispatch_message(
self,
text: str,
chat_id: str,
chat_type: str,
user_id: str,
user_name: str,
) -> None:
"""Build a MessageEvent and hand it to the base class handler."""
if not self._message_handler:
return
source = self.build_source(
chat_id=chat_id,
chat_name=chat_id,
chat_type=chat_type,
user_id=user_id,
user_name=user_name,
)
event = MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=source,
message_id=str(int(time.time() * 1000)),
timestamp=__import__("datetime").datetime.now(),
)
await self.handle_message(event)
# ---------------------------------------------------------------------------
# Plugin registration
# ---------------------------------------------------------------------------
def check_requirements() -> bool:
"""Check if IRC is configured.
Only requires the server and channel — no external pip packages needed.
"""
server = os.getenv("IRC_SERVER", "")
channel = os.getenv("IRC_CHANNEL", "")
# Also accept config.yaml-only configuration (no env vars).
# The gateway passes PlatformConfig; we just check env for the
# hermes setup / requirements check path.
return bool(server and channel)
def validate_config(config) -> bool:
"""Validate that the platform config has enough info to connect."""
extra = getattr(config, "extra", {}) or {}
server = os.getenv("IRC_SERVER") or extra.get("server", "")
channel = os.getenv("IRC_CHANNEL") or extra.get("channel", "")
return bool(server and channel)
def register(ctx):
"""Plugin entry point — called by the Hermes plugin system."""
ctx.register_platform(
name="irc",
label="IRC",
adapter_factory=lambda cfg: IRCAdapter(cfg),
check_fn=check_requirements,
validate_config=validate_config,
required_env=["IRC_SERVER", "IRC_CHANNEL", "IRC_NICKNAME"],
install_hint="No extra packages needed (stdlib only)",
# Auth env vars for _is_user_authorized() integration
allowed_users_env="IRC_ALLOWED_USERS",
allow_all_env="IRC_ALLOW_ALL_USERS",
# IRC line limit after protocol overhead
max_message_length=450,
# Display
emoji="💬",
# IRC doesn't have phone numbers to redact
pii_safe=False,
allow_update_command=True,
# LLM guidance
platform_hint=(
"You are chatting via IRC. IRC does not support markdown formatting "
"— use plain text only. Messages are limited to ~450 characters per "
"line (long messages are automatically split). In channels, users "
"address you by prefixing your nick. Keep responses concise and "
"conversational."
),
)

View File

@@ -3181,6 +3181,15 @@ class AIAgent:
platform_key = (self.platform or "").lower().strip()
if platform_key in PLATFORM_HINTS:
prompt_parts.append(PLATFORM_HINTS[platform_key])
elif platform_key:
# Check plugin registry for platform-specific LLM guidance
try:
from gateway.platform_registry import platform_registry
_entry = platform_registry.get(platform_key)
if _entry and _entry.platform_hint:
prompt_parts.append(_entry.platform_hint)
except Exception:
pass
return "\n\n".join(p.strip() for p in prompt_parts if p.strip())

View File

@@ -578,7 +578,9 @@ class TestDeliverResultErrorReturns:
with patch("gateway.config.load_gateway_config"):
result = _deliver_result(job, "Output.")
assert result is not None
assert "unknown platform" in result
# With the plugin platform registry, unknown names are accepted
# as potential plugins but fail at delivery if no adapter exists.
assert "fax" in result.lower()
def test_returns_error_when_platform_disabled(self):
from gateway.config import Platform

View File

@@ -285,11 +285,11 @@ class TestAuthorizationMaps(unittest.TestCase):
class TestSendMessageToolRouting(unittest.TestCase):
"""Verify email routing in send_message_tool."""
def test_email_in_platform_map(self):
import tools.send_message_tool as smt
import inspect
source = inspect.getsource(smt._handle_send)
self.assertIn('"email"', source)
def test_email_platform_resolves(self):
"""Email platform resolves via Platform() — no hardcoded map needed."""
from gateway.config import Platform
p = Platform("email")
self.assertEqual(p, Platform.EMAIL)
def test_send_to_platform_has_email_branch(self):
import tools.send_message_tool as smt
@@ -301,11 +301,12 @@ class TestSendMessageToolRouting(unittest.TestCase):
class TestCronDelivery(unittest.TestCase):
"""Verify email in cron scheduler platform_map."""
def test_email_in_cron_platform_map(self):
import cron.scheduler
import inspect
source = inspect.getsource(cron.scheduler)
self.assertIn('"email"', source)
def test_email_resolves_for_cron(self):
"""Email platform resolves via Platform() for cron delivery."""
from gateway.config import Platform
p = Platform("email")
self.assertEqual(p, Platform.EMAIL)
self.assertEqual(p.value, "email")
class TestToolset(unittest.TestCase):
@@ -334,10 +335,11 @@ class TestChannelDirectory(unittest.TestCase):
"""Verify email in channel directory session-based discovery."""
def test_email_in_session_discovery(self):
import gateway.channel_directory
import inspect
source = inspect.getsource(gateway.channel_directory.build_channel_directory)
self.assertIn('"email"', source)
"""Email is discovered via Platform enum iteration in build_channel_directory."""
from gateway.config import Platform
# Email is a built-in Platform member, so it's included in
# `for plat in Platform:` iteration inside build_channel_directory.
self.assertIn(Platform.EMAIL, Platform.__members__.values())
class TestGatewaySetup(unittest.TestCase):

View File

@@ -0,0 +1,380 @@
"""Tests for the IRC platform adapter plugin."""
import asyncio
import os
import sys
import pytest
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
# Ensure the plugins directory is on sys.path for direct import
_REPO_ROOT = Path(__file__).resolve().parents[2]
_IRC_PLUGIN_DIR = _REPO_ROOT / "plugins" / "platforms" / "irc"
if str(_IRC_PLUGIN_DIR) not in sys.path:
sys.path.insert(0, str(_IRC_PLUGIN_DIR))
# ── IRC protocol helpers ─────────────────────────────────────────────────
from adapter import _parse_irc_message, _extract_nick
class TestIRCProtocolHelpers:
def test_parse_simple_command(self):
msg = _parse_irc_message("PING :server.example.com")
assert msg["command"] == "PING"
assert msg["params"] == ["server.example.com"]
assert msg["prefix"] == ""
def test_parse_prefixed_message(self):
msg = _parse_irc_message(":nick!user@host PRIVMSG #channel :Hello world")
assert msg["prefix"] == "nick!user@host"
assert msg["command"] == "PRIVMSG"
assert msg["params"] == ["#channel", "Hello world"]
def test_parse_numeric_reply(self):
msg = _parse_irc_message(":server 001 hermes-bot :Welcome to IRC")
assert msg["prefix"] == "server"
assert msg["command"] == "001"
assert msg["params"] == ["hermes-bot", "Welcome to IRC"]
def test_parse_nick_collision(self):
msg = _parse_irc_message(":server 433 * hermes-bot :Nickname is already in use")
assert msg["command"] == "433"
def test_extract_nick_full_prefix(self):
assert _extract_nick("nick!user@host") == "nick"
def test_extract_nick_bare(self):
assert _extract_nick("server.example.com") == "server.example.com"
# ── IRC Adapter ──────────────────────────────────────────────────────────
from adapter import IRCAdapter, check_requirements, validate_config
class TestIRCAdapterInit:
def test_init_from_env(self, monkeypatch):
monkeypatch.setenv("IRC_SERVER", "irc.test.net")
monkeypatch.setenv("IRC_PORT", "6667")
monkeypatch.setenv("IRC_NICKNAME", "testbot")
monkeypatch.setenv("IRC_CHANNEL", "#test")
monkeypatch.setenv("IRC_USE_TLS", "false")
from gateway.config import PlatformConfig
cfg = PlatformConfig(enabled=True)
adapter = IRCAdapter(cfg)
assert adapter.server == "irc.test.net"
assert adapter.port == 6667
assert adapter.nickname == "testbot"
assert adapter.channel == "#test"
assert adapter.use_tls is False
def test_init_from_config_extra(self, monkeypatch):
# Clear any env vars
for key in ("IRC_SERVER", "IRC_PORT", "IRC_NICKNAME", "IRC_CHANNEL", "IRC_USE_TLS"):
monkeypatch.delenv(key, raising=False)
from gateway.config import PlatformConfig
cfg = PlatformConfig(
enabled=True,
extra={
"server": "irc.libera.chat",
"port": 6697,
"nickname": "hermes",
"channel": "#hermes-dev",
"use_tls": True,
},
)
adapter = IRCAdapter(cfg)
assert adapter.server == "irc.libera.chat"
assert adapter.port == 6697
assert adapter.nickname == "hermes"
assert adapter.channel == "#hermes-dev"
assert adapter.use_tls is True
def test_env_overrides_config(self, monkeypatch):
monkeypatch.setenv("IRC_SERVER", "env-server.net")
from gateway.config import PlatformConfig
cfg = PlatformConfig(
enabled=True,
extra={"server": "config-server.net", "channel": "#ch"},
)
adapter = IRCAdapter(cfg)
assert adapter.server == "env-server.net"
class TestIRCAdapterSend:
@pytest.fixture
def adapter(self, monkeypatch):
for key in ("IRC_SERVER", "IRC_PORT", "IRC_NICKNAME", "IRC_CHANNEL", "IRC_USE_TLS"):
monkeypatch.delenv(key, raising=False)
from gateway.config import PlatformConfig
cfg = PlatformConfig(
enabled=True,
extra={
"server": "localhost",
"port": 6667,
"nickname": "testbot",
"channel": "#test",
"use_tls": False,
},
)
return IRCAdapter(cfg)
@pytest.mark.asyncio
async def test_send_not_connected(self, adapter):
result = await adapter.send("#test", "hello")
assert result.success is False
assert "Not connected" in result.error
@pytest.mark.asyncio
async def test_send_success(self, adapter):
writer = MagicMock()
writer.is_closing = MagicMock(return_value=False)
writer.write = MagicMock()
writer.drain = AsyncMock()
adapter._writer = writer
result = await adapter.send("#test", "hello world")
assert result.success is True
assert result.message_id is not None
# Verify PRIVMSG was sent
writer.write.assert_called()
sent_data = writer.write.call_args[0][0]
assert b"PRIVMSG #test :hello world" in sent_data
@pytest.mark.asyncio
async def test_send_splits_long_messages(self, adapter):
writer = MagicMock()
writer.is_closing = MagicMock(return_value=False)
writer.write = MagicMock()
writer.drain = AsyncMock()
adapter._writer = writer
long_msg = "x" * 1000
result = await adapter.send("#test", long_msg)
assert result.success is True
# Should have been split into multiple PRIVMSG calls
assert writer.write.call_count > 1
class TestIRCAdapterMessageParsing:
@pytest.fixture
def adapter(self, monkeypatch):
for key in ("IRC_SERVER", "IRC_PORT", "IRC_NICKNAME", "IRC_CHANNEL", "IRC_USE_TLS"):
monkeypatch.delenv(key, raising=False)
from gateway.config import PlatformConfig
cfg = PlatformConfig(
enabled=True,
extra={
"server": "localhost",
"port": 6667,
"nickname": "hermes",
"channel": "#test",
"use_tls": False,
},
)
a = IRCAdapter(cfg)
a._current_nick = "hermes"
a._registered = True
return a
@pytest.mark.asyncio
async def test_handle_ping(self, adapter):
writer = MagicMock()
writer.is_closing = MagicMock(return_value=False)
writer.write = MagicMock()
writer.drain = AsyncMock()
adapter._writer = writer
await adapter._handle_line("PING :test-server")
sent = writer.write.call_args[0][0]
assert b"PONG :test-server" in sent
@pytest.mark.asyncio
async def test_handle_welcome(self, adapter):
adapter._registered = False
adapter._registration_event = asyncio.Event()
await adapter._handle_line(":server 001 hermes :Welcome to IRC")
assert adapter._registered is True
assert adapter._registration_event.is_set()
@pytest.mark.asyncio
async def test_handle_nick_collision(self, adapter):
writer = MagicMock()
writer.is_closing = MagicMock(return_value=False)
writer.write = MagicMock()
writer.drain = AsyncMock()
adapter._writer = writer
await adapter._handle_line(":server 433 * hermes :Nickname in use")
assert adapter._current_nick == "hermes_"
sent = writer.write.call_args[0][0]
assert b"NICK hermes_" in sent
@pytest.mark.asyncio
async def test_handle_addressed_channel_message(self, adapter):
"""Messages addressed to the bot (nick: msg) should be dispatched."""
handler = AsyncMock(return_value="response")
adapter._message_handler = handler
# Mock handle_message to capture the event
dispatched = []
original_dispatch = adapter._dispatch_message
async def capture_dispatch(**kwargs):
dispatched.append(kwargs)
adapter._dispatch_message = capture_dispatch
await adapter._handle_line(":user!u@host PRIVMSG #test :hermes: hello there")
assert len(dispatched) == 1
assert dispatched[0]["text"] == "hello there"
assert dispatched[0]["chat_id"] == "#test"
@pytest.mark.asyncio
async def test_ignores_unaddressed_channel_message(self, adapter):
dispatched = []
async def capture_dispatch(**kwargs):
dispatched.append(kwargs)
adapter._dispatch_message = capture_dispatch
adapter._message_handler = AsyncMock()
await adapter._handle_line(":user!u@host PRIVMSG #test :just talking")
assert len(dispatched) == 0
@pytest.mark.asyncio
async def test_handle_dm(self, adapter):
"""DMs (target == bot nick) should always be dispatched."""
dispatched = []
async def capture_dispatch(**kwargs):
dispatched.append(kwargs)
adapter._dispatch_message = capture_dispatch
adapter._message_handler = AsyncMock()
await adapter._handle_line(":user!u@host PRIVMSG hermes :private message")
assert len(dispatched) == 1
assert dispatched[0]["text"] == "private message"
assert dispatched[0]["chat_type"] == "dm"
assert dispatched[0]["chat_id"] == "user"
@pytest.mark.asyncio
async def test_ignores_own_messages(self, adapter):
dispatched = []
async def capture_dispatch(**kwargs):
dispatched.append(kwargs)
adapter._dispatch_message = capture_dispatch
adapter._message_handler = AsyncMock()
await adapter._handle_line(":hermes!bot@host PRIVMSG #test :my own msg")
assert len(dispatched) == 0
@pytest.mark.asyncio
async def test_ctcp_action_converted(self, adapter):
"""CTCP ACTION (/me) should be converted to text."""
dispatched = []
async def capture_dispatch(**kwargs):
dispatched.append(kwargs)
adapter._dispatch_message = capture_dispatch
adapter._message_handler = AsyncMock()
await adapter._handle_line(":user!u@host PRIVMSG hermes :\x01ACTION waves\x01")
assert len(dispatched) == 1
assert dispatched[0]["text"] == "* user waves"
class TestIRCAdapterMarkdown:
def test_strip_bold(self):
assert IRCAdapter._strip_markdown("**bold**") == "bold"
def test_strip_italic(self):
assert IRCAdapter._strip_markdown("*italic*") == "italic"
def test_strip_code(self):
assert IRCAdapter._strip_markdown("`code`") == "code"
def test_strip_link(self):
result = IRCAdapter._strip_markdown("[click here](https://example.com)")
assert result == "click here (https://example.com)"
def test_strip_image(self):
result = IRCAdapter._strip_markdown("![alt](https://example.com/img.png)")
assert result == "https://example.com/img.png"
# ── Requirements / validation ────────────────────────────────────────────
class TestIRCRequirements:
def test_check_requirements_with_env(self, monkeypatch):
monkeypatch.setenv("IRC_SERVER", "irc.test.net")
monkeypatch.setenv("IRC_CHANNEL", "#test")
assert check_requirements() is True
def test_check_requirements_missing_server(self, monkeypatch):
monkeypatch.delenv("IRC_SERVER", raising=False)
monkeypatch.setenv("IRC_CHANNEL", "#test")
assert check_requirements() is False
def test_check_requirements_missing_channel(self, monkeypatch):
monkeypatch.setenv("IRC_SERVER", "irc.test.net")
monkeypatch.delenv("IRC_CHANNEL", raising=False)
assert check_requirements() is False
def test_validate_config_from_extra(self, monkeypatch):
for key in ("IRC_SERVER", "IRC_CHANNEL"):
monkeypatch.delenv(key, raising=False)
from gateway.config import PlatformConfig
cfg = PlatformConfig(extra={"server": "irc.test.net", "channel": "#test"})
assert validate_config(cfg) is True
def test_validate_config_missing(self, monkeypatch):
for key in ("IRC_SERVER", "IRC_CHANNEL"):
monkeypatch.delenv(key, raising=False)
from gateway.config import PlatformConfig
cfg = PlatformConfig(extra={})
assert validate_config(cfg) is False
# ── Plugin registration ──────────────────────────────────────────────────
class TestIRCPluginRegistration:
"""Test the register() entry point."""
def test_register_adds_to_registry(self, monkeypatch):
monkeypatch.setenv("IRC_SERVER", "irc.test.net")
monkeypatch.setenv("IRC_CHANNEL", "#test")
from gateway.platform_registry import platform_registry
# Clean up if already registered
platform_registry.unregister("irc")
from adapter import register
ctx = MagicMock()
register(ctx)
ctx.register_platform.assert_called_once()
call_kwargs = ctx.register_platform.call_args
assert call_kwargs[1]["name"] == "irc" or call_kwargs[0][0] == "irc" if call_kwargs[0] else call_kwargs[1]["name"] == "irc"

View File

@@ -0,0 +1,377 @@
"""Tests for the platform adapter registry and dynamic Platform enum."""
import os
import pytest
from unittest.mock import MagicMock, patch
from dataclasses import dataclass
from gateway.platform_registry import PlatformRegistry, PlatformEntry, platform_registry
from gateway.config import Platform, PlatformConfig, GatewayConfig
# ── Platform enum dynamic members ─────────────────────────────────────────
class TestPlatformEnumDynamic:
"""Test that Platform enum accepts unknown values for plugin platforms."""
def test_builtin_members_still_work(self):
assert Platform.TELEGRAM.value == "telegram"
assert Platform("telegram") is Platform.TELEGRAM
def test_dynamic_member_created(self):
p = Platform("irc")
assert p.value == "irc"
assert p.name == "IRC"
def test_dynamic_member_identity_stable(self):
"""Same value returns same object (cached)."""
a = Platform("irc")
b = Platform("irc")
assert a is b
def test_dynamic_member_case_normalised(self):
"""Mixed case normalised to lowercase."""
a = Platform("IRC")
b = Platform("irc")
assert a is b
assert a.value == "irc"
def test_dynamic_member_with_hyphens(self):
p = Platform("my-platform")
assert p.value == "my-platform"
assert p.name == "MY_PLATFORM"
def test_dynamic_member_rejects_non_string(self):
with pytest.raises(ValueError):
Platform(123)
def test_dynamic_member_rejects_empty(self):
with pytest.raises(ValueError):
Platform("")
def test_dynamic_member_rejects_whitespace_only(self):
with pytest.raises(ValueError):
Platform(" ")
# ── PlatformRegistry ──────────────────────────────────────────────────────
class TestPlatformRegistry:
"""Test the PlatformRegistry itself."""
def _make_entry(self, name="test", check_ok=True, validate_ok=True, factory_ok=True):
adapter_mock = MagicMock()
return PlatformEntry(
name=name,
label=name.title(),
adapter_factory=lambda cfg, _m=adapter_mock: _m if factory_ok else (_ for _ in ()).throw(RuntimeError("factory error")),
check_fn=lambda: check_ok,
validate_config=lambda cfg: validate_ok,
required_env=[],
source="plugin",
), adapter_mock
def test_register_and_get(self):
reg = PlatformRegistry()
entry, _ = self._make_entry("alpha")
reg.register(entry)
assert reg.get("alpha") is entry
assert reg.is_registered("alpha")
def test_get_unknown_returns_none(self):
reg = PlatformRegistry()
assert reg.get("nonexistent") is None
def test_unregister(self):
reg = PlatformRegistry()
entry, _ = self._make_entry("beta")
reg.register(entry)
assert reg.unregister("beta") is True
assert reg.get("beta") is None
assert reg.unregister("beta") is False # already gone
def test_create_adapter_success(self):
reg = PlatformRegistry()
entry, mock_adapter = self._make_entry("gamma")
reg.register(entry)
result = reg.create_adapter("gamma", MagicMock())
assert result is mock_adapter
def test_create_adapter_unknown_name(self):
reg = PlatformRegistry()
assert reg.create_adapter("unknown", MagicMock()) is None
def test_create_adapter_check_fails(self):
reg = PlatformRegistry()
entry, _ = self._make_entry("delta", check_ok=False)
reg.register(entry)
assert reg.create_adapter("delta", MagicMock()) is None
def test_create_adapter_validate_fails(self):
reg = PlatformRegistry()
entry, _ = self._make_entry("epsilon", validate_ok=False)
reg.register(entry)
assert reg.create_adapter("epsilon", MagicMock()) is None
def test_create_adapter_factory_exception(self):
reg = PlatformRegistry()
entry = PlatformEntry(
name="broken",
label="Broken",
adapter_factory=lambda cfg: (_ for _ in ()).throw(RuntimeError("boom")),
check_fn=lambda: True,
validate_config=None,
source="plugin",
)
reg.register(entry)
# factory raises → create_adapter returns None instead of propagating
assert reg.create_adapter("broken", MagicMock()) is None
def test_create_adapter_no_validate(self):
"""When validate_config is None, skip validation."""
reg = PlatformRegistry()
mock_adapter = MagicMock()
entry = PlatformEntry(
name="novalidate",
label="NoValidate",
adapter_factory=lambda cfg: mock_adapter,
check_fn=lambda: True,
validate_config=None,
source="plugin",
)
reg.register(entry)
assert reg.create_adapter("novalidate", MagicMock()) is mock_adapter
def test_all_entries(self):
reg = PlatformRegistry()
e1, _ = self._make_entry("one")
e2, _ = self._make_entry("two")
reg.register(e1)
reg.register(e2)
names = {e.name for e in reg.all_entries()}
assert names == {"one", "two"}
def test_plugin_entries(self):
reg = PlatformRegistry()
plugin_entry, _ = self._make_entry("plugged")
builtin_entry = PlatformEntry(
name="core",
label="Core",
adapter_factory=lambda cfg: MagicMock(),
check_fn=lambda: True,
source="builtin",
)
reg.register(plugin_entry)
reg.register(builtin_entry)
plugin_names = {e.name for e in reg.plugin_entries()}
assert plugin_names == {"plugged"}
def test_re_register_replaces(self):
reg = PlatformRegistry()
entry1, mock1 = self._make_entry("dup")
entry2 = PlatformEntry(
name="dup",
label="Dup v2",
adapter_factory=lambda cfg: "v2",
check_fn=lambda: True,
source="plugin",
)
reg.register(entry1)
reg.register(entry2)
assert reg.get("dup").label == "Dup v2"
# ── GatewayConfig integration ────────────────────────────────────────────
class TestGatewayConfigPluginPlatform:
"""Test that GatewayConfig parses and validates plugin platforms."""
def test_from_dict_accepts_plugin_platform(self):
data = {
"platforms": {
"telegram": {"enabled": True, "token": "test-token"},
"irc": {"enabled": True, "extra": {"server": "irc.libera.chat"}},
}
}
cfg = GatewayConfig.from_dict(data)
platform_values = {p.value for p in cfg.platforms}
assert "telegram" in platform_values
assert "irc" in platform_values
def test_get_connected_platforms_includes_registered_plugin(self):
"""Plugin platform with registry entry passes get_connected_platforms."""
# Register a fake plugin platform
from gateway.platform_registry import platform_registry as _reg
test_entry = PlatformEntry(
name="testplat",
label="TestPlat",
adapter_factory=lambda cfg: MagicMock(),
check_fn=lambda: True,
validate_config=lambda cfg: bool(cfg.extra.get("token")),
source="plugin",
)
_reg.register(test_entry)
try:
data = {
"platforms": {
"testplat": {"enabled": True, "extra": {"token": "abc"}},
}
}
cfg = GatewayConfig.from_dict(data)
connected = cfg.get_connected_platforms()
connected_values = {p.value for p in connected}
assert "testplat" in connected_values
finally:
_reg.unregister("testplat")
def test_get_connected_platforms_excludes_unregistered_plugin(self):
"""Plugin platform without registry entry is excluded."""
data = {
"platforms": {
"unknown_plugin": {"enabled": True, "extra": {"token": "abc"}},
}
}
cfg = GatewayConfig.from_dict(data)
connected = cfg.get_connected_platforms()
connected_values = {p.value for p in connected}
assert "unknown_plugin" not in connected_values
def test_get_connected_platforms_excludes_invalid_config(self):
"""Plugin platform with failing validate_config is excluded."""
from gateway.platform_registry import platform_registry as _reg
test_entry = PlatformEntry(
name="badconfig",
label="BadConfig",
adapter_factory=lambda cfg: MagicMock(),
check_fn=lambda: True,
validate_config=lambda cfg: False, # always fails
source="plugin",
)
_reg.register(test_entry)
try:
data = {
"platforms": {
"badconfig": {"enabled": True, "extra": {}},
}
}
cfg = GatewayConfig.from_dict(data)
connected = cfg.get_connected_platforms()
connected_values = {p.value for p in connected}
assert "badconfig" not in connected_values
finally:
_reg.unregister("badconfig")
# ── Extended PlatformEntry fields ─────────────────────────────────────
class TestPlatformEntryExtendedFields:
"""Test the auth, message length, and display fields on PlatformEntry."""
def test_default_field_values(self):
entry = PlatformEntry(
name="test",
label="Test",
adapter_factory=lambda cfg: None,
check_fn=lambda: True,
)
assert entry.allowed_users_env == ""
assert entry.allow_all_env == ""
assert entry.max_message_length == 0
assert entry.pii_safe is False
assert entry.emoji == "🔌"
assert entry.allow_update_command is True
def test_custom_auth_fields(self):
entry = PlatformEntry(
name="irc",
label="IRC",
adapter_factory=lambda cfg: None,
check_fn=lambda: True,
allowed_users_env="IRC_ALLOWED_USERS",
allow_all_env="IRC_ALLOW_ALL_USERS",
max_message_length=450,
pii_safe=False,
emoji="💬",
)
assert entry.allowed_users_env == "IRC_ALLOWED_USERS"
assert entry.allow_all_env == "IRC_ALLOW_ALL_USERS"
assert entry.max_message_length == 450
assert entry.emoji == "💬"
# ── Cron platform resolution ─────────────────────────────────────────
class TestCronPlatformResolution:
"""Test that cron delivery accepts plugin platform names."""
def test_builtin_platform_resolves(self):
"""Built-in platform names resolve via Platform() call."""
p = Platform("telegram")
assert p is Platform.TELEGRAM
def test_plugin_platform_resolves(self):
"""Plugin platform names create dynamic enum members."""
p = Platform("irc")
assert p.value == "irc"
def test_invalid_platform_type_rejected(self):
"""Non-string values are still rejected."""
with pytest.raises(ValueError):
Platform(None)
# ── platforms.py integration ──────────────────────────────────────────
class TestPlatformsMerge:
"""Test get_all_platforms() merges with registry."""
def test_get_all_platforms_includes_builtins(self):
from hermes_cli.platforms import get_all_platforms, PLATFORMS
merged = get_all_platforms()
for key in PLATFORMS:
assert key in merged
def test_get_all_platforms_includes_plugin(self):
from hermes_cli.platforms import get_all_platforms
from gateway.platform_registry import platform_registry as _reg
_reg.register(PlatformEntry(
name="testmerge",
label="TestMerge",
adapter_factory=lambda cfg: None,
check_fn=lambda: True,
source="plugin",
emoji="🧪",
))
try:
merged = get_all_platforms()
assert "testmerge" in merged
assert "TestMerge" in merged["testmerge"].label
finally:
_reg.unregister("testmerge")
def test_platform_label_plugin_fallback(self):
from hermes_cli.platforms import platform_label
from gateway.platform_registry import platform_registry as _reg
_reg.register(PlatformEntry(
name="labeltest",
label="LabelTest",
adapter_factory=lambda cfg: None,
check_fn=lambda: True,
source="plugin",
emoji="🏷️",
))
try:
label = platform_label("labeltest")
assert "LabelTest" in label
finally:
_reg.unregister("labeltest")

View File

@@ -83,9 +83,12 @@ class TestSessionSourceRoundtrip:
assert restored.chat_topic is None
assert restored.chat_type == "dm"
def test_invalid_platform_raises(self):
with pytest.raises((ValueError, KeyError)):
SessionSource.from_dict({"platform": "nonexistent", "chat_id": "1"})
def test_unknown_platform_accepted_for_plugins(self):
"""Unknown platform names are now accepted (dynamic enum members for
plugin platforms), so from_dict should succeed rather than raise."""
source = SessionSource.from_dict({"platform": "nonexistent", "chat_id": "1"})
assert source.platform.value == "nonexistent"
assert source.chat_id == "1"
class TestSessionSourceDescription:

View File

@@ -145,28 +145,12 @@ def _handle_send(args):
except Exception as e:
return json.dumps(_error(f"Failed to load gateway config: {e}"))
platform_map = {
"telegram": Platform.TELEGRAM,
"discord": Platform.DISCORD,
"slack": Platform.SLACK,
"whatsapp": Platform.WHATSAPP,
"signal": Platform.SIGNAL,
"bluebubbles": Platform.BLUEBUBBLES,
"matrix": Platform.MATRIX,
"mattermost": Platform.MATTERMOST,
"homeassistant": Platform.HOMEASSISTANT,
"dingtalk": Platform.DINGTALK,
"feishu": Platform.FEISHU,
"wecom": Platform.WECOM,
"wecom_callback": Platform.WECOM_CALLBACK,
"weixin": Platform.WEIXIN,
"email": Platform.EMAIL,
"sms": Platform.SMS,
}
platform = platform_map.get(platform_name)
if not platform:
avail = ", ".join(platform_map.keys())
return tool_error(f"Unknown platform: {platform_name}. Available: {avail}")
# Accept any platform name — built-in names resolve to their enum
# member, plugin platform names create dynamic members via _missing_().
try:
platform = Platform(platform_name)
except (ValueError, KeyError):
return tool_error(f"Unknown platform: {platform_name}")
pconfig = config.platforms.get(platform)
if not pconfig or not pconfig.enabled:
@@ -314,6 +298,27 @@ def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id:
}
async def _send_via_adapter(platform, pconfig, chat_id, chunk):
"""Send a message via a live gateway adapter (for plugin platforms).
Falls back to error if no adapter is connected for this platform.
"""
try:
from gateway.run import _gateway_runner_ref
runner = _gateway_runner_ref()
if runner:
adapter = runner.adapters.get(platform)
if adapter:
from gateway.platforms.base import SendResult
result = await adapter.send(chat_id=chat_id, content=chunk)
if result.success:
return {"success": True, "message_id": result.message_id}
return {"error": f"Adapter send failed: {result.error}"}
except Exception as e:
return {"error": f"Plugin platform send failed: {e}"}
return {"error": f"No live adapter for platform '{platform.value}'. Is the gateway running with this platform connected?"}
async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None):
"""Route a message to the appropriate platform sender.
@@ -352,6 +357,16 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
if _feishu_available:
_MAX_LENGTHS[Platform.FEISHU] = FeishuAdapter.MAX_MESSAGE_LENGTH
# Check plugin registry for max_message_length
if platform not in _MAX_LENGTHS:
try:
from gateway.platform_registry import platform_registry
entry = platform_registry.get(platform.value)
if entry and entry.max_message_length > 0:
_MAX_LENGTHS[platform] = entry.max_message_length
except Exception:
pass
# Smart-chunk the message to fit within platform limits.
# For short messages or platforms without a known limit this is a no-op.
max_len = _MAX_LENGTHS.get(platform)
@@ -425,7 +440,9 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
elif platform == Platform.BLUEBUBBLES:
result = await _send_bluebubbles(pconfig.extra, chat_id, chunk)
else:
result = {"error": f"Direct sending not yet implemented for {platform.value}"}
# Plugin platform — route through the gateway's live adapter
# if available, otherwise report the error.
result = await _send_via_adapter(platform, pconfig, chat_id, chunk)
if isinstance(result, dict) and result.get("error"):
return result

View File

@@ -452,6 +452,28 @@ def resolve_toolset(name: str, visited: Set[str] = None) -> List[str]:
return [e.name for e in registry._tools.values() if e.toolset == name]
except Exception:
pass
# Auto-generate a toolset for plugin platforms (hermes-<name>).
# Gives them _HERMES_CORE_TOOLS plus any tools the plugin registered
# into a toolset matching the platform name.
if name.startswith("hermes-"):
platform_name = name[len("hermes-"):]
try:
from gateway.platform_registry import platform_registry
if platform_registry.is_registered(platform_name):
plugin_tools = set(_HERMES_CORE_TOOLS)
try:
from tools.registry import registry
plugin_tools.update(
e.name for e in registry._tools.values()
if e.toolset == platform_name
)
except Exception:
pass
return list(plugin_tools)
except Exception:
pass
return []
# Collect direct tools

View File

@@ -7,7 +7,9 @@ sidebar_position: 9
This guide covers adding a new messaging platform to the Hermes gateway. A platform adapter connects Hermes to an external messaging service (Telegram, Discord, WeCom, etc.) so users can interact with the agent through that service.
:::tip
Adding a platform adapter touches 20+ files across code, config, and docs. Use this guide as a checklist — the adapter file itself is typically only 40% of the work.
There are two ways to add a platform:
- **Plugin** (recommended for community/third-party): Drop a plugin directory into `~/.hermes/plugins/` — zero core code changes needed. See [Plugin Path](#plugin-path-recommended) below.
- **Built-in**: Modify 20+ files across code, config, and docs. Use the [Built-in Checklist](#step-by-step-checklist) below.
:::
## Architecture Overview
@@ -26,7 +28,152 @@ Every adapter extends `BasePlatformAdapter` from `gateway/platforms/base.py` and
Inbound messages are received by the adapter and forwarded via `self.handle_message(event)`, which the base class routes to the gateway runner.
## Step-by-Step Checklist
## Plugin Path (Recommended)
The plugin system lets you add a platform adapter without modifying any core Hermes code. Your plugin is a directory with two files:
```
~/.hermes/plugins/my-platform/
PLUGIN.yaml # Plugin metadata
adapter.py # Adapter class + register() entry point
```
### PLUGIN.yaml
```yaml
name: my-platform
version: 1.0.0
description: My custom messaging platform adapter
requires_env:
- MY_PLATFORM_TOKEN
- MY_PLATFORM_CHANNEL
```
### adapter.py
```python
import os
from gateway.platforms.base import (
BasePlatformAdapter, SendResult, MessageEvent, MessageType,
)
from gateway.config import Platform, PlatformConfig
class MyPlatformAdapter(BasePlatformAdapter):
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform("my_platform"))
extra = config.extra or {}
self.token = os.getenv("MY_PLATFORM_TOKEN") or extra.get("token", "")
async def connect(self) -> bool:
# Connect to the platform API, start listeners
self._mark_connected()
return True
async def disconnect(self) -> None:
self._mark_disconnected()
async def send(self, chat_id, content, reply_to=None, metadata=None):
# Send message via platform API
return SendResult(success=True, message_id="...")
async def get_chat_info(self, chat_id):
return {"name": chat_id, "type": "dm"}
def check_requirements() -> bool:
return bool(os.getenv("MY_PLATFORM_TOKEN"))
def validate_config(config) -> bool:
extra = getattr(config, "extra", {}) or {}
return bool(os.getenv("MY_PLATFORM_TOKEN") or extra.get("token"))
def register(ctx):
"""Plugin entry point — called by the Hermes plugin system."""
ctx.register_platform(
name="my_platform",
label="My Platform",
adapter_factory=lambda cfg: MyPlatformAdapter(cfg),
check_fn=check_requirements,
validate_config=validate_config,
required_env=["MY_PLATFORM_TOKEN"],
install_hint="pip install my-platform-sdk",
# Per-platform user authorization env vars
allowed_users_env="MY_PLATFORM_ALLOWED_USERS",
allow_all_env="MY_PLATFORM_ALLOW_ALL_USERS",
# Message length limit for smart chunking (0 = no limit)
max_message_length=4000,
# LLM guidance injected into system prompt
platform_hint=(
"You are chatting via My Platform. "
"It supports markdown formatting."
),
# Display
emoji="💬",
)
# Optional: register platform-specific tools
ctx.register_tool(
name="my_platform_search",
toolset="my_platform",
schema={...},
handler=my_search_handler,
)
```
### Configuration
Users configure the platform in `config.yaml`:
```yaml
gateway:
platforms:
my_platform:
enabled: true
extra:
token: "..."
channel: "#general"
```
Or via environment variables (which the adapter reads in `__init__`).
### What the Plugin System Handles Automatically
When you call `ctx.register_platform()`, the following integration points are handled for you — no core code changes needed:
| Integration point | How it works |
|---|---|
| Gateway adapter creation | Registry checked before built-in if/elif chain |
| Config parsing | `Platform._missing_()` accepts any platform name |
| Connected platform validation | Registry `validate_config()` called |
| User authorization | `allowed_users_env` / `allow_all_env` checked |
| Cron delivery | `Platform()` resolves any registered name |
| send_message tool | Routes through live gateway adapter |
| Webhook cross-platform delivery | Registry checked for known platforms |
| `/update` command access | `allow_update_command` flag |
| Channel directory | Plugin platforms included in enumeration |
| System prompt hints | `platform_hint` injected into LLM context |
| Message chunking | `max_message_length` for smart splitting |
| PII redaction | `pii_safe` flag |
| `hermes status` | Shows plugin platforms with `(plugin)` tag |
| `hermes gateway setup` | Plugin platforms appear in setup menu |
| `hermes tools` / `hermes skills` | Plugin platforms in per-platform config |
| Token lock (multi-profile) | Use `acquire_scoped_lock()` in your `connect()` |
| Orphaned config warning | Descriptive log when plugin is missing |
### Reference Implementation
See `plugins/platforms/irc/` in the repo for a complete working example — a full async IRC adapter with zero external dependencies.
---
## Step-by-Step Checklist (Built-in Path)
:::note
This checklist is for adding a platform directly to the Hermes core codebase — typically done by core contributors for officially supported platforms. Community/third-party platforms should use the [Plugin Path](#plugin-path-recommended) above.
:::
### 1. Platform Enum