mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-06 10:47:12 +08:00
Slash commands (_run_simple_slash, _handle_thread_create_slash) bypassed every DISCORD_ALLOWED_* gate enforced by on_message. Any guild member could invoke /background (RCE via terminal), /restart, /model, /skill, etc. CVSS 9.8 Critical. - _evaluate_slash_authorization mirrors on_message gates (user, role, channel, ignored channel) with fail-closed semantics - _check_slash_authorization sends ephemeral reject + logs + admin alert - Auth gate runs before defer() so rejections are ephemeral - /skill autocomplete returns [] for unauthorized users (no catalog leak) - Component views (ExecApproval, SlashConfirm, UpdatePrompt, ModelPicker) now honor role allowlists via shared _component_check_auth helper - Optional DISCORD_HIDE_SLASH_COMMANDS defense-in-depth - Cross-platform admin alert (Telegram/Slack fallback) on unauthorized attempts Based on PR #18125 by @0xyg3n.
738 lines
28 KiB
Python
738 lines
28 KiB
Python
"""Security regression tests: slash commands honor on_message authorization gates.
|
|
|
|
Slash invocations (``_run_simple_slash``, ``_handle_thread_create_slash``)
|
|
historically bypassed every gate ``on_message`` enforces — DISCORD_ALLOWED_USERS,
|
|
DISCORD_ALLOWED_ROLES, DISCORD_ALLOWED_CHANNELS, DISCORD_IGNORED_CHANNELS.
|
|
Any guild member could invoke ``/background``, ``/restart``, etc. as the
|
|
operator. ``_check_slash_authorization`` mirrors all four gates one-for-one.
|
|
|
|
These tests pin the security-correct behavior so the bypass cannot regress.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from gateway.config import PlatformConfig
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Discord module mock — borrowed from test_discord_slash_commands.py so this
|
|
# file runs on machines without discord.py installed.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _ensure_discord_mock():
|
|
if "discord" in sys.modules and hasattr(sys.modules["discord"], "__file__"):
|
|
return # real discord installed
|
|
|
|
if sys.modules.get("discord") is None:
|
|
discord_mod = MagicMock()
|
|
discord_mod.Intents.default.return_value = MagicMock()
|
|
discord_mod.DMChannel = type("DMChannel", (), {})
|
|
discord_mod.Thread = type("Thread", (), {})
|
|
discord_mod.ForumChannel = type("ForumChannel", (), {})
|
|
discord_mod.Interaction = object
|
|
|
|
class _FakePermissions:
|
|
def __init__(self, value=0, **_):
|
|
self.value = value
|
|
|
|
discord_mod.Permissions = _FakePermissions
|
|
|
|
class _FakeGroup:
|
|
def __init__(self, *, name, description, parent=None):
|
|
self.name = name
|
|
self.description = description
|
|
self.parent = parent
|
|
self._children: dict[str, object] = {}
|
|
if parent is not None:
|
|
parent.add_command(self)
|
|
|
|
def add_command(self, cmd):
|
|
self._children[cmd.name] = cmd
|
|
|
|
class _FakeCommand:
|
|
def __init__(self, *, name, description, callback, parent=None):
|
|
self.name = name
|
|
self.description = description
|
|
self.callback = callback
|
|
self.parent = parent
|
|
self.default_permissions = None
|
|
|
|
discord_mod.app_commands = SimpleNamespace(
|
|
describe=lambda **kwargs: (lambda fn: fn),
|
|
choices=lambda **kwargs: (lambda fn: fn),
|
|
autocomplete=lambda **kwargs: (lambda fn: fn),
|
|
Choice=lambda **kwargs: SimpleNamespace(**kwargs),
|
|
Group=_FakeGroup,
|
|
Command=_FakeCommand,
|
|
)
|
|
|
|
ext_mod = MagicMock()
|
|
commands_mod = MagicMock()
|
|
commands_mod.Bot = MagicMock
|
|
ext_mod.commands = commands_mod
|
|
|
|
sys.modules["discord"] = discord_mod
|
|
sys.modules.setdefault("discord.ext", ext_mod)
|
|
sys.modules.setdefault("discord.ext.commands", commands_mod)
|
|
|
|
|
|
_ensure_discord_mock()
|
|
|
|
from gateway.platforms.discord import DiscordAdapter # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolate_discord_env(monkeypatch):
|
|
for var in (
|
|
"DISCORD_ALLOWED_USERS",
|
|
"DISCORD_ALLOWED_ROLES",
|
|
"DISCORD_ALLOWED_CHANNELS",
|
|
"DISCORD_IGNORED_CHANNELS",
|
|
"DISCORD_HIDE_SLASH_COMMANDS",
|
|
"DISCORD_ALLOW_BOTS",
|
|
):
|
|
monkeypatch.delenv(var, raising=False)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _stub_discord_permissions(monkeypatch):
|
|
"""Pin discord.Permissions to a plain stand-in so tests can assert the
|
|
bitfield value regardless of whether real discord.py or a sibling test
|
|
module's MagicMock is loaded."""
|
|
import discord
|
|
|
|
class _Perm:
|
|
def __init__(self, value=0, **_):
|
|
self.value = value
|
|
|
|
monkeypatch.setattr(discord, "Permissions", _Perm)
|
|
|
|
|
|
@pytest.fixture
|
|
def adapter():
|
|
config = PlatformConfig(enabled=True, token="***")
|
|
a = DiscordAdapter(config)
|
|
a._client = SimpleNamespace(user=SimpleNamespace(id=99999, name="HermesBot"), guilds=[])
|
|
return a
|
|
|
|
|
|
_SENTINEL = object()
|
|
|
|
|
|
def _make_interaction(
|
|
user_id, *, channel_id=12345, guild_id=42, in_dm=False, in_thread=False,
|
|
parent_channel_id=None, user=_SENTINEL,
|
|
):
|
|
"""Build a mock Discord Interaction with a still-unresponded response.
|
|
|
|
``channel_id`` may be set to ``None`` to simulate a guild interaction
|
|
payload missing a resolvable channel id (fail-closed exercise).
|
|
Pass ``user=None`` to simulate a payload missing the user object.
|
|
"""
|
|
import discord
|
|
|
|
response = SimpleNamespace(send_message=AsyncMock(), defer=AsyncMock())
|
|
|
|
if in_dm:
|
|
channel = discord.DMChannel()
|
|
elif in_thread:
|
|
channel = discord.Thread()
|
|
channel.id = channel_id
|
|
channel.parent_id = parent_channel_id
|
|
elif channel_id is None:
|
|
channel = None
|
|
else:
|
|
channel = SimpleNamespace(id=channel_id)
|
|
|
|
if user is _SENTINEL:
|
|
user_obj = SimpleNamespace(id=int(user_id), name=f"user_{user_id}")
|
|
else:
|
|
user_obj = user
|
|
|
|
return SimpleNamespace(
|
|
user=user_obj,
|
|
guild=SimpleNamespace(owner_id=999),
|
|
guild_id=guild_id,
|
|
channel_id=channel_id,
|
|
channel=channel,
|
|
response=response,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Backwards-compat: empty allowlist → everything passes (matches on_message)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_allowlist_allows_everyone(adapter):
|
|
"""SECURITY-CRITICAL backwards-compat: deployments without any allowlist
|
|
env vars set must see ZERO behavior change. on_message lets everyone
|
|
through in this case (returns True at line 1890); slash must do the same.
|
|
"""
|
|
interaction = _make_interaction("999999999")
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is True
|
|
interaction.response.send_message.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_allowlist_dm_also_allowed(adapter):
|
|
"""Same for DMs — no allowlist means no restriction, matching on_message."""
|
|
interaction = _make_interaction("999999999", in_dm=True)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# User allowlist (DISCORD_ALLOWED_USERS) parity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_allowed_user_passes(adapter):
|
|
adapter._allowed_user_ids = {"100200300"}
|
|
interaction = _make_interaction("100200300")
|
|
assert await adapter._check_slash_authorization(interaction, "/background hi") is True
|
|
interaction.response.send_message.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disallowed_user_rejected_with_ephemeral(adapter, caplog):
|
|
adapter._allowed_user_ids = {"100200300"}
|
|
interaction = _make_interaction("999999999")
|
|
with caplog.at_level(logging.WARNING):
|
|
assert await adapter._check_slash_authorization(interaction, "/background hi") is False
|
|
interaction.response.send_message.assert_awaited_once()
|
|
args, kwargs = interaction.response.send_message.call_args
|
|
assert kwargs.get("ephemeral") is True
|
|
assert "not authorized" in (args[0] if args else kwargs.get("content", "")).lower()
|
|
assert any("Unauthorized slash attempt" in r.message for r in caplog.records)
|
|
assert any("DISCORD_ALLOWED_USERS" in r.message for r in caplog.records)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Role allowlist (DISCORD_ALLOWED_ROLES) parity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_role_member_passes(adapter):
|
|
"""A user whose Member.roles includes an allowed role passes the gate."""
|
|
adapter._allowed_role_ids = {1234}
|
|
interaction = _make_interaction("999999999")
|
|
interaction.user.roles = [SimpleNamespace(id=1234)]
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_role_non_member_rejected(adapter):
|
|
"""A user without any matching role is rejected even if no user allowlist."""
|
|
adapter._allowed_role_ids = {1234}
|
|
interaction = _make_interaction("999999999")
|
|
interaction.user.roles = [SimpleNamespace(id=9999)] # different role
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Channel allowlist (DISCORD_ALLOWED_CHANNELS) parity — the gate prajer used
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_not_in_allowlist_rejected(adapter, monkeypatch, caplog):
|
|
"""on_message blocks messages in channels not in DISCORD_ALLOWED_CHANNELS;
|
|
slash must do the same. This is the EXACT bypass prajer exploited.
|
|
"""
|
|
monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "1111,2222")
|
|
interaction = _make_interaction("100200300", channel_id=9999)
|
|
with caplog.at_level(logging.WARNING):
|
|
assert await adapter._check_slash_authorization(interaction, "/background hi") is False
|
|
assert any("DISCORD_ALLOWED_CHANNELS" in r.message for r in caplog.records)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_in_allowlist_passes(adapter, monkeypatch):
|
|
monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "1111,2222")
|
|
interaction = _make_interaction("100200300", channel_id=1111)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_allowlist_wildcard_passes(adapter, monkeypatch):
|
|
"""``*`` in DISCORD_ALLOWED_CHANNELS = allow any channel, matching on_message."""
|
|
monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "*")
|
|
interaction = _make_interaction("100200300", channel_id=9999)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_allowlist_does_not_apply_to_dms(adapter, monkeypatch):
|
|
"""DMs aren't channel-gated — they go through on_message's DM lockdown."""
|
|
monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "1111")
|
|
interaction = _make_interaction("100200300", in_dm=True)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Channel blocklist (DISCORD_IGNORED_CHANNELS) parity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ignored_channel_rejected(adapter, monkeypatch, caplog):
|
|
monkeypatch.setenv("DISCORD_IGNORED_CHANNELS", "9999")
|
|
interaction = _make_interaction("100200300", channel_id=9999)
|
|
with caplog.at_level(logging.WARNING):
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is False
|
|
assert any("DISCORD_IGNORED_CHANNELS" in r.message for r in caplog.records)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ignored_channel_wildcard_blocks_all(adapter, monkeypatch):
|
|
monkeypatch.setenv("DISCORD_IGNORED_CHANNELS", "*")
|
|
interaction = _make_interaction("100200300", channel_id=9999)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cross-platform admin notification
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unauthorized_attempt_notifies_telegram(adapter):
|
|
from gateway.session import Platform
|
|
|
|
telegram_adapter = SimpleNamespace(send=AsyncMock())
|
|
home = SimpleNamespace(chat_id="987654321")
|
|
runner = SimpleNamespace(
|
|
adapters={Platform.TELEGRAM: telegram_adapter},
|
|
config=SimpleNamespace(get_home_channel=lambda p: home if p is Platform.TELEGRAM else None),
|
|
)
|
|
adapter.gateway_runner = runner
|
|
adapter._allowed_user_ids = {"100200300"}
|
|
|
|
interaction = _make_interaction("999999999")
|
|
await adapter._check_slash_authorization(interaction, "/background hi")
|
|
|
|
# Notify is fire-and-forget — let the scheduled task run.
|
|
await asyncio.sleep(0)
|
|
await asyncio.sleep(0)
|
|
|
|
telegram_adapter.send.assert_awaited_once()
|
|
chat_id, msg = telegram_adapter.send.call_args.args
|
|
assert chat_id == "987654321"
|
|
assert "Unauthorized" in msg
|
|
assert "999999999" in msg
|
|
assert "/background hi" in msg
|
|
assert "DISCORD_ALLOWED_USERS" in msg
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notify_silently_no_ops_without_runner(adapter):
|
|
adapter.gateway_runner = None
|
|
await adapter._notify_unauthorized_slash("u", "1", 2, 3, "/x", "reason") # must not raise
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notify_falls_back_to_slack_if_no_telegram(adapter):
|
|
from gateway.session import Platform
|
|
|
|
slack_adapter = SimpleNamespace(send=AsyncMock())
|
|
home_slack = SimpleNamespace(chat_id="C12345")
|
|
runner = SimpleNamespace(
|
|
adapters={Platform.SLACK: slack_adapter},
|
|
config=SimpleNamespace(
|
|
get_home_channel=lambda p: home_slack if p is Platform.SLACK else None,
|
|
),
|
|
)
|
|
adapter.gateway_runner = runner
|
|
await adapter._notify_unauthorized_slash("u", "1", 2, 3, "/x", "reason")
|
|
slack_adapter.send.assert_awaited_once()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Opt-in visibility hide
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_visibility_hide_off_by_default_is_noop(adapter, monkeypatch):
|
|
"""DISCORD_HIDE_SLASH_COMMANDS unset → don't touch any command's permissions."""
|
|
cmd = SimpleNamespace(name="x", default_permissions="UNCHANGED")
|
|
tree = SimpleNamespace(get_commands=lambda: [cmd])
|
|
|
|
# Re-run the registration tail logic by calling the bit that decides:
|
|
# we don't have a clean way to simulate the env-gated branch from
|
|
# _register_slash_commands, so we just confirm the helper itself works
|
|
# AND assert the env-gating logic is correct.
|
|
assert os.environ.get("DISCORD_HIDE_SLASH_COMMANDS") is None
|
|
# Helper should still work when called directly:
|
|
adapter._apply_owner_only_visibility(tree)
|
|
# When called directly the helper applies — env gating is at the call site,
|
|
# which we exercise in an integration-style test below.
|
|
|
|
|
|
def test_visibility_hide_helper_zeroes_perms(adapter):
|
|
cmd_a = SimpleNamespace(name="a", default_permissions=None)
|
|
cmd_b = SimpleNamespace(name="b", default_permissions=None)
|
|
tree = SimpleNamespace(get_commands=lambda: [cmd_a, cmd_b])
|
|
adapter._apply_owner_only_visibility(tree)
|
|
assert cmd_a.default_permissions is not None
|
|
assert cmd_b.default_permissions is not None
|
|
assert cmd_a.default_permissions.value == 0
|
|
assert cmd_b.default_permissions.value == 0
|
|
|
|
|
|
def test_visibility_hide_tolerates_unsetable_command(adapter, caplog):
|
|
class _Frozen:
|
|
__slots__ = ("name",)
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
cmd_ok = SimpleNamespace(name="ok", default_permissions=None)
|
|
cmd_bad = _Frozen("bad")
|
|
tree = SimpleNamespace(get_commands=lambda: [cmd_bad, cmd_ok])
|
|
|
|
with caplog.at_level(logging.DEBUG):
|
|
adapter._apply_owner_only_visibility(tree)
|
|
|
|
assert cmd_ok.default_permissions.value == 0
|
|
|
|
|
|
# os import for test_visibility_hide_off_by_default_is_noop
|
|
import os # noqa: E402
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fail-closed parity on malformed slash auth context
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_channel_id_rejected_when_channel_policy_configured(
|
|
adapter, monkeypatch,
|
|
):
|
|
"""A guild interaction without a resolvable channel id must fail
|
|
closed when DISCORD_ALLOWED_CHANNELS is configured. Without this
|
|
guard the entire channel-policy block silently fell through."""
|
|
monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "1111,2222")
|
|
interaction = _make_interaction("100200300", channel_id=None)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is False
|
|
interaction.response.send_message.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_channel_id_allowed_when_no_channel_policy(adapter):
|
|
"""No DISCORD_ALLOWED_CHANNELS configured + missing channel id: still
|
|
pass through the channel block (matches no-allowlist default)."""
|
|
interaction = _make_interaction("100200300", channel_id=None)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_user_rejected_when_allowlist_configured(adapter):
|
|
"""interaction.user is None with a user/role allowlist active:
|
|
fail closed without raising AttributeError."""
|
|
adapter._allowed_user_ids = {"100200300"}
|
|
interaction = _make_interaction("100200300", user=None)
|
|
# Must not raise — must return False with an ephemeral rejection
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is False
|
|
interaction.response.send_message.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_user_allowed_when_no_allowlist_configured(adapter):
|
|
"""interaction.user is None but no allowlist configured: allow
|
|
(preserves no-allowlist back-compat -- anyone is allowed when no
|
|
policy is in effect)."""
|
|
interaction = _make_interaction("100200300", user=None)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Thread parent channel allowlist parity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_thread_parent_in_allowlist_passes(adapter, monkeypatch):
|
|
"""Thread whose parent channel is on DISCORD_ALLOWED_CHANNELS passes
|
|
even though the thread id itself isn't on the list."""
|
|
monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "5555")
|
|
interaction = _make_interaction(
|
|
"100200300", channel_id=9999, in_thread=True, parent_channel_id=5555,
|
|
)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_thread_parent_in_ignorelist_rejects(adapter, monkeypatch):
|
|
"""Thread whose parent channel is on DISCORD_IGNORED_CHANNELS rejects
|
|
even when the thread id itself isn't ignored."""
|
|
monkeypatch.setenv("DISCORD_IGNORED_CHANNELS", "5555")
|
|
interaction = _make_interaction(
|
|
"100200300", channel_id=9999, in_thread=True, parent_channel_id=5555,
|
|
)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ignored_beats_allowed(adapter, monkeypatch):
|
|
"""Channel listed in BOTH allowed and ignored: the ignored entry wins.
|
|
Anything else would be a foot-gun where adding to ignored does nothing
|
|
if the channel is also explicitly allowed."""
|
|
monkeypatch.setenv("DISCORD_ALLOWED_CHANNELS", "1111")
|
|
monkeypatch.setenv("DISCORD_IGNORED_CHANNELS", "1111")
|
|
interaction = _make_interaction("100200300", channel_id=1111)
|
|
assert await adapter._check_slash_authorization(interaction, "/help") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Admin notify soft-fail fallback
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notify_falls_back_to_slack_on_telegram_soft_fail(adapter):
|
|
"""adapter.send returning SendResult(success=False) must NOT short-
|
|
circuit the fallback chain. Treating a soft failure as delivered
|
|
means a Telegram outage swallows alerts silently."""
|
|
from gateway.session import Platform
|
|
|
|
soft_fail = SimpleNamespace(success=False, error="rate limited")
|
|
telegram_adapter = SimpleNamespace(send=AsyncMock(return_value=soft_fail))
|
|
slack_adapter = SimpleNamespace(send=AsyncMock())
|
|
home_tg = SimpleNamespace(chat_id="987654321")
|
|
home_sl = SimpleNamespace(chat_id="C12345")
|
|
homes = {Platform.TELEGRAM: home_tg, Platform.SLACK: home_sl}
|
|
runner = SimpleNamespace(
|
|
adapters={
|
|
Platform.TELEGRAM: telegram_adapter,
|
|
Platform.SLACK: slack_adapter,
|
|
},
|
|
config=SimpleNamespace(get_home_channel=lambda p: homes.get(p)),
|
|
)
|
|
adapter.gateway_runner = runner
|
|
|
|
await adapter._notify_unauthorized_slash("u", "1", 2, 3, "/x", "reason")
|
|
|
|
telegram_adapter.send.assert_awaited_once()
|
|
slack_adapter.send.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notify_returns_on_telegram_truthy_success(adapter):
|
|
"""adapter.send returning SendResult(success=True) -- or any object
|
|
without a falsy success attribute -- should still short-circuit at
|
|
Telegram. (This guards against the soft-fail patch over-correcting.)"""
|
|
from gateway.session import Platform
|
|
|
|
ok = SimpleNamespace(success=True, message_id="m1")
|
|
telegram_adapter = SimpleNamespace(send=AsyncMock(return_value=ok))
|
|
slack_adapter = SimpleNamespace(send=AsyncMock())
|
|
home_tg = SimpleNamespace(chat_id="987654321")
|
|
home_sl = SimpleNamespace(chat_id="C12345")
|
|
homes = {Platform.TELEGRAM: home_tg, Platform.SLACK: home_sl}
|
|
runner = SimpleNamespace(
|
|
adapters={
|
|
Platform.TELEGRAM: telegram_adapter,
|
|
Platform.SLACK: slack_adapter,
|
|
},
|
|
config=SimpleNamespace(get_home_channel=lambda p: homes.get(p)),
|
|
)
|
|
adapter.gateway_runner = runner
|
|
|
|
await adapter._notify_unauthorized_slash("u", "1", 2, 3, "/x", "reason")
|
|
|
|
telegram_adapter.send.assert_awaited_once()
|
|
slack_adapter.send.assert_not_awaited()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# /skill autocomplete + callback gating
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _capture_skill_registration(adapter, monkeypatch, entries):
|
|
"""Run ``_register_skill_group`` against a stubbed skill catalog and
|
|
return ``(handler_callback, autocomplete_callback)``.
|
|
|
|
The autocomplete callback is captured by monkeypatching
|
|
``discord.app_commands.autocomplete`` -- the production decorator is
|
|
a no-op stub in this test file's discord mock, so capturing the
|
|
callback through it is the direct route in tests.
|
|
"""
|
|
import discord
|
|
|
|
captured: dict = {}
|
|
|
|
def fake_categories(reserved_names):
|
|
# Match discord_skill_commands_by_category's tuple shape:
|
|
# (categories_dict, uncategorized_list, hidden_count)
|
|
return ({}, list(entries), 0)
|
|
|
|
import hermes_cli.commands as _hc
|
|
monkeypatch.setattr(
|
|
_hc, "discord_skill_commands_by_category", fake_categories,
|
|
)
|
|
|
|
def capture_autocomplete(**kwargs):
|
|
# Only one autocomplete in /skill registration: name=...
|
|
captured["autocomplete"] = kwargs.get("name")
|
|
|
|
def _passthrough(fn):
|
|
return fn
|
|
|
|
return _passthrough
|
|
|
|
monkeypatch.setattr(
|
|
discord.app_commands, "autocomplete", capture_autocomplete,
|
|
raising=False,
|
|
)
|
|
|
|
registered: list = []
|
|
|
|
class _Tree:
|
|
def get_commands(self):
|
|
return []
|
|
|
|
def add_command(self, cmd):
|
|
registered.append(cmd)
|
|
|
|
adapter._register_skill_group(_Tree())
|
|
assert registered, "_register_skill_group did not register a command"
|
|
return registered[0].callback, captured["autocomplete"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_skill_autocomplete_returns_empty_for_unauthorized(
|
|
adapter, monkeypatch,
|
|
):
|
|
"""Autocomplete must not leak the installed skill catalog to users
|
|
who can't run /skill. With DISCORD_ALLOWED_USERS configured and the
|
|
interaction user outside it, the autocomplete callback returns []."""
|
|
adapter._allowed_user_ids = {"100200300"}
|
|
entries = [
|
|
("alpha", "First skill", "/alpha"),
|
|
("beta", "Second skill", "/beta"),
|
|
]
|
|
_handler, autocomplete = _capture_skill_registration(
|
|
adapter, monkeypatch, entries,
|
|
)
|
|
|
|
interaction = _make_interaction("999999999")
|
|
result = await autocomplete(interaction, "")
|
|
assert result == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_skill_autocomplete_returns_choices_for_authorized(
|
|
adapter, monkeypatch,
|
|
):
|
|
"""Sanity: an authorized user still gets the autocomplete suggestions."""
|
|
adapter._allowed_user_ids = {"100200300"}
|
|
entries = [
|
|
("alpha", "First skill", "/alpha"),
|
|
("beta", "Second skill", "/beta"),
|
|
]
|
|
_handler, autocomplete = _capture_skill_registration(
|
|
adapter, monkeypatch, entries,
|
|
)
|
|
|
|
interaction = _make_interaction("100200300")
|
|
result = await autocomplete(interaction, "")
|
|
assert len(result) == 2
|
|
assert {choice.value for choice in result} == {"alpha", "beta"}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_skill_handler_rejects_before_dispatch_for_unauthorized(
|
|
adapter, monkeypatch,
|
|
):
|
|
"""The /skill handler must call _check_slash_authorization BEFORE
|
|
skill_lookup. Otherwise unknown vs known names produce divergent
|
|
responses ("Unknown skill: foo" vs auth rejection) which is a
|
|
catalog-probing oracle."""
|
|
adapter._allowed_user_ids = {"100200300"}
|
|
entries = [("alpha", "First skill", "/alpha")]
|
|
handler, _autocomplete = _capture_skill_registration(
|
|
adapter, monkeypatch, entries,
|
|
)
|
|
|
|
# Patch _run_simple_slash so we can detect any leak through it.
|
|
dispatched: list = []
|
|
|
|
async def fake_dispatch(_interaction, text):
|
|
dispatched.append(text)
|
|
|
|
adapter._run_simple_slash = fake_dispatch # type: ignore[assignment]
|
|
|
|
interaction = _make_interaction("999999999")
|
|
await handler(interaction, "alpha", "")
|
|
|
|
interaction.response.send_message.assert_awaited_once()
|
|
args, kwargs = interaction.response.send_message.call_args
|
|
assert kwargs.get("ephemeral") is True
|
|
assert "not authorized" in (
|
|
args[0] if args else kwargs.get("content", "")
|
|
).lower()
|
|
# Critically: nothing was dispatched, and the auth message did NOT
|
|
# mention the skill name "alpha" (no catalog leak).
|
|
assert dispatched == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_skill_handler_known_and_unknown_produce_same_rejection(
|
|
adapter, monkeypatch,
|
|
):
|
|
"""An unauthorized user probing for valid skill names must see the
|
|
same rejection text regardless of whether the name they tried is
|
|
on the registered catalog."""
|
|
adapter._allowed_user_ids = {"100200300"}
|
|
entries = [("alpha", "First skill", "/alpha")]
|
|
handler, _ = _capture_skill_registration(adapter, monkeypatch, entries)
|
|
|
|
adapter._run_simple_slash = AsyncMock() # type: ignore[assignment]
|
|
|
|
known_interaction = _make_interaction("999999999")
|
|
unknown_interaction = _make_interaction("999999999")
|
|
await handler(known_interaction, "alpha", "")
|
|
await handler(unknown_interaction, "definitely-not-a-skill", "")
|
|
|
|
known_interaction.response.send_message.assert_awaited_once()
|
|
unknown_interaction.response.send_message.assert_awaited_once()
|
|
known_args, known_kwargs = known_interaction.response.send_message.call_args
|
|
unknown_args, unknown_kwargs = (
|
|
unknown_interaction.response.send_message.call_args
|
|
)
|
|
assert known_args == unknown_args
|
|
assert known_kwargs == unknown_kwargs
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_skill_handler_dispatches_for_authorized(
|
|
adapter, monkeypatch,
|
|
):
|
|
"""Sanity: an authorized user reaches _run_simple_slash with the
|
|
resolved cmd_key and arguments."""
|
|
adapter._allowed_user_ids = {"100200300"}
|
|
entries = [("alpha", "First skill", "/alpha")]
|
|
handler, _ = _capture_skill_registration(adapter, monkeypatch, entries)
|
|
|
|
dispatched: list = []
|
|
|
|
async def fake_dispatch(_interaction, text):
|
|
dispatched.append(text)
|
|
|
|
adapter._run_simple_slash = fake_dispatch # type: ignore[assignment]
|
|
|
|
interaction = _make_interaction("100200300")
|
|
await handler(interaction, "alpha", "extra args")
|
|
assert dispatched == ["/alpha extra args"]
|