fix(telegram): authorize update prompt callbacks

This commit is contained in:
Junass1
2026-04-15 04:09:14 +03:00
committed by Teknium
parent 18396af31e
commit 096260ce78
3 changed files with 74 additions and 12 deletions

View File

@@ -163,6 +163,15 @@ class TelegramAdapter(BasePlatformAdapter):
# Approval button state: message_id → session_key
self._approval_state: Dict[int, str] = {}
@staticmethod
def _is_callback_user_authorized(user_id: str) -> bool:
"""Return whether a Telegram inline-button caller may perform gated actions."""
allowed_csv = os.getenv("TELEGRAM_ALLOWED_USERS", "").strip()
if not allowed_csv:
return True
allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()}
return "*" in allowed_ids or user_id in allowed_ids
def _fallback_ips(self) -> list[str]:
"""Return validated fallback IPs from config (populated by _apply_env_overrides)."""
configured = self.config.extra.get("fallback_ips", []) if getattr(self.config, "extra", None) else []
@@ -1440,12 +1449,9 @@ class TelegramAdapter(BasePlatformAdapter):
# Only authorized users may click approval buttons.
caller_id = str(getattr(query.from_user, "id", ""))
allowed_csv = os.getenv("TELEGRAM_ALLOWED_USERS", "").strip()
if allowed_csv:
allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()}
if "*" not in allowed_ids and caller_id not in allowed_ids:
await query.answer(text="⛔ You are not authorized to approve commands.")
return
if not self._is_callback_user_authorized(caller_id):
await query.answer(text="⛔ You are not authorized to approve commands.")
return
session_key = self._approval_state.pop(approval_id, None)
if not session_key:
@@ -1490,6 +1496,10 @@ class TelegramAdapter(BasePlatformAdapter):
if not data.startswith("update_prompt:"):
return
answer = data.split(":", 1)[1] # "y" or "n"
caller_id = str(getattr(query.from_user, "id", ""))
if not self._is_callback_user_authorized(caller_id):
await query.answer(text="⛔ You are not authorized to answer update prompts.")
return
await query.answer(text=f"Sent '{answer}' to the update process.")
# Edit the message to show the choice and remove buttons
label = "Yes" if answer == "y" else "No"

View File

@@ -191,7 +191,7 @@ AUTHOR_MAP = {
"yangzhi.see@gmail.com": "SeeYangZhi",
"yongtenglei@gmail.com": "yongtenglei",
"young@YoungdeMacBook-Pro.local": "YoungYang963",
"ysfalweshcan@gmail.com": "Awsh1",
"ysfalweshcan@gmail.com": "Junass1",
"ysfwaxlycan@gmail.com": "WAXLYY",
"yusufalweshdemir@gmail.com": "Dusk1e",
"zhouboli@gmail.com": "zhouboli",

View File

@@ -263,7 +263,7 @@ class TestTelegramApprovalCallback:
mock_resolve.assert_not_called()
@pytest.mark.asyncio
async def test_update_prompt_callback_not_affected(self):
async def test_update_prompt_callback_not_affected(self, tmp_path):
"""Ensure update prompt callbacks still work."""
adapter = _make_adapter()
@@ -281,11 +281,63 @@ class TestTelegramApprovalCallback:
context = MagicMock()
with patch("tools.approval.resolve_gateway_approval") as mock_resolve:
with patch("hermes_constants.get_hermes_home", return_value=Path("/tmp/test")):
try:
with patch("hermes_constants.get_hermes_home", return_value=tmp_path):
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": ""}):
await adapter._handle_callback_query(update, context)
except Exception:
pass # May fail on file write, that's fine
# Should NOT have triggered approval resolution
mock_resolve.assert_not_called()
assert (tmp_path / ".update_response").read_text() == "y"
@pytest.mark.asyncio
async def test_update_prompt_callback_rejects_unauthorized_user(self, tmp_path):
"""Update prompt buttons should honor TELEGRAM_ALLOWED_USERS."""
adapter = _make_adapter()
query = AsyncMock()
query.data = "update_prompt:y"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.from_user.id = 222
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch("hermes_constants.get_hermes_home", return_value=tmp_path):
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "111"}):
await adapter._handle_callback_query(update, context)
query.answer.assert_called_once()
assert "not authorized" in query.answer.call_args[1]["text"].lower()
query.edit_message_text.assert_not_called()
assert not (tmp_path / ".update_response").exists()
@pytest.mark.asyncio
async def test_update_prompt_callback_allows_authorized_user(self, tmp_path):
"""Allowed Telegram users can still answer update prompt buttons."""
adapter = _make_adapter()
query = AsyncMock()
query.data = "update_prompt:n"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.from_user.id = 111
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch("hermes_constants.get_hermes_home", return_value=tmp_path):
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "111"}):
await adapter._handle_callback_query(update, context)
query.answer.assert_called_once()
query.edit_message_text.assert_called_once()
assert (tmp_path / ".update_response").read_text() == "n"