Compare commits

...

1 Commits

Author SHA1 Message Date
liuhao1024
8e9280c710 fix(gateway): redact credentials from approval prompts before sending to clients (#48456)
Tirith redacts its own findings, but the approval-request callbacks built the
operator prompt from the RAW command string, so a credential-shaped value
Tirith flagged was sent verbatim to clients, undoing the redaction one layer up.

THREE egress transports carried the leak; all fixed via a shared module-level
seam _redact_approval_command() (redact_sensitive_text force=True):
  1. chat platforms — _approval_notify_sync (gateway/run.py): redact before
     both the button path (send_exec_approval) and the plain-text /approve
     fallback.
  2. SSE/API stream — _approval_notify (gateway/platforms/api_server.py):
     redact event['command'] before it is enqueued to API/desktop clients.
  3. TUI JSON-RPC — three register_gateway_notify callbacks in
     tui_gateway/server.py emitted the raw approval_data to the TUI client;
     route them through a new _emit_approval_request() helper that redacts
     payload['command'] before _emit. (whole-bug-class: all sibling transports.)

force=True so the prompt — a hard secret-egress boundary — honors redaction
even when security.redact_secrets is off. Clean commands pass through unchanged.

Tests bind the seam (real credential patterns, force-when-disabled), assert all
three callbacks redact before their send/enqueue/emit sink (AST contract that
rejects a discarded-result call; behavior test for the TUI helper; a guard that
no registration emits the raw payload). All mutation-checked.

Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
2026-06-22 23:01:06 +05:30
4 changed files with 231 additions and 3 deletions

View File

@@ -3964,6 +3964,14 @@ class APIServerAdapter(BasePlatformAdapter):
def _approval_notify(approval_data: Dict[str, Any]) -> None:
event = dict(approval_data or {})
# Redact credentials from the command before it enters the
# SSE/API event stream — same egress bug as #48456, second
# transport: API/desktop clients would otherwise receive the
# raw command Tirith flagged. Reuse the gateway seam.
if "command" in event:
from gateway.run import _redact_approval_command
event["command"] = _redact_approval_command(event.get("command"))
event.update({
"event": "approval.request",
"run_id": run_id,

View File

@@ -295,6 +295,22 @@ def _redact_gateway_user_facing_secrets(text: str) -> str:
return redacted
def _redact_approval_command(cmd: "str | None") -> str:
"""Redact credentials from a command before it goes into an approval prompt.
Tirith's *findings* are already redacted, but the gateway approval prompt
is built from the raw command string, so a credential-shaped value Tirith
flagged would otherwise be echoed verbatim to the chat platform (#48456).
Uses ``redact_sensitive_text(force=True)`` — the same Tirith-grade redactor
— so the prompt honors redaction even when ``security.redact_secrets`` is
off. Module-level so the wiring is unit-testable (the call site is a deeply
nested gateway closure that cannot be driven directly).
"""
from agent.redact import redact_sensitive_text
return redact_sensitive_text(str(cmd or ""), force=True)
def _gateway_provider_error_reply(text: str) -> str:
"""Map raw provider/API errors to a short user-safe Telegram reply."""
if _GATEWAY_AUTH_ERROR_RE.search(text):
@@ -15746,6 +15762,14 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
cmd = approval_data.get("command", "")
desc = approval_data.get("description", "dangerous command")
# Redact credentials from the command before displaying it in
# the approval prompt — Tirith's findings are already redacted,
# but the raw command string still leaks secrets to the chat
# platform (#48456). Applied here so BOTH the button-based
# (send_exec_approval) and plain-text fallback paths below use
# the redacted value.
cmd = _redact_approval_command(cmd)
# Prefer button-based approval when the adapter supports it.
# Check the *class* for the method, not the instance — avoids
# false positives from MagicMock auto-attribute creation in tests.

View File

@@ -0,0 +1,182 @@
"""Regression test for approval prompt credential redaction (issue #48456).
When Tirith flags a command for containing a credential-shaped pattern, the
gateway approval prompt must redact the credential from the command text
before sending it to the chat platform. Without this fix, the raw command
(with the credential in plaintext) is sent verbatim to Telegram/Discord/etc.,
undoing Tirith's redaction one layer up.
The redaction is wired through the module-level ``_redact_approval_command``
seam. These tests bind that seam -- the production wiring -- not just the
underlying ``redact_sensitive_text`` helper, so they fail if the redaction
call is removed from either approval path.
Credential fixtures are built at runtime from a benign prefix + a run of
``X`` characters (the same trick tests/agent/test_redact.py uses): they match
the redactor regexes so the assertions stay meaningful, but contain no real
or real-looking key, so secret scanners do not flag this file.
"""
from gateway.run import _redact_approval_command
# Synthetic, scanner-safe credential fixtures. Each matches its redactor
# regex (ghp_/sk-/JWT) but is unmistakably fake -- a run of X's, never a
# real or real-format key.
_FAKE_GHP = "ghp_" + "X" * 36
_FAKE_OPENAI = "sk-proj-" + "X" * 40
_FAKE_JWT = "eyJ" + "X" * 20 + "." + "eyJ" + "X" * 24 + "." + "X" * 30
class TestRedactApprovalCommand:
"""Contract for the approval-prompt redaction seam used by the gateway."""
def test_redacts_github_pat(self):
raw = "curl -H 'Authorization: token " + _FAKE_GHP + "' https://api.github.com/user"
out = _redact_approval_command(raw)
assert _FAKE_GHP not in out
# command structure preserved so the operator can still judge the action
assert "curl" in out
assert "github.com" in out
def test_redacts_openai_key(self):
raw = "export OPENAI_API_KEY=" + _FAKE_OPENAI + " && python s.py"
out = _redact_approval_command(raw)
assert _FAKE_OPENAI not in out
assert "python s.py" in out
def test_redacts_bearer_token(self):
raw = "curl -H 'Authorization: Bearer " + _FAKE_JWT + "' https://api.example.com"
out = _redact_approval_command(raw)
assert _FAKE_JWT not in out
def test_clean_command_passes_through_unchanged(self):
raw = "ls -la /tmp && echo hello"
assert _redact_approval_command(raw) == raw
def test_forces_redaction_even_when_disabled(self, monkeypatch):
"""force=True must redact even if security.redact_secrets is off -- the
approval prompt is a hard secret-egress boundary regardless of config."""
raw = "curl -H 'Authorization: token " + _FAKE_GHP + "' https://api.github.com"
# With redaction globally disabled, the seam must STILL redact (force=True).
monkeypatch.setattr("agent.redact._REDACT_ENABLED", False, raising=False)
out = _redact_approval_command(raw)
assert _FAKE_GHP not in out
def test_handles_none_and_empty(self):
assert _redact_approval_command("") == ""
assert _redact_approval_command(None) == ""
class TestApprovalCommandWiring:
"""Guard the production wiring on BOTH approval-notify transports:
1. the chat-platform path (_approval_notify_sync in gateway/run.py), and
2. the SSE/API path (_approval_notify in gateway/platforms/api_server.py),
each of which must route the command through _redact_approval_command and
REASSIGN the redacted value before any send/enqueue (so the raw command
cannot reach a client). Uses AST (not char-offset string slicing) so a
benign refactor doesn't cause a false failure, and so a discarded-result
call (`_redact(cmd); send(cmd)`) does NOT pass."""
def _assert_redacts_then_uses(self, module, func_name: str, sink_substr: str):
"""Parse `module`'s full AST, locate the (possibly nested) function
`func_name`, and assert it contains an assignment
`<x> = _redact_approval_command(...)` whose result is then used by a
statement matching `sink_substr` on a LATER line. Walking the real AST
(not a source slice) is refactor-robust and rejects discarded-result
calls (the call must be an assignment, not a bare expression)."""
import ast
import inspect
source = inspect.getsource(module)
tree = ast.parse(source)
target_fn = None
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == func_name:
target_fn = node
break
assert target_fn is not None, f"function {func_name} not found in {module.__name__}"
redact_line = None
for node in ast.walk(target_fn):
if isinstance(node, ast.Assign) and isinstance(node.value, ast.Call):
fn = node.value.func
if isinstance(fn, ast.Name) and fn.id == "_redact_approval_command":
redact_line = node.lineno
assert redact_line is not None, (
f"{func_name} must assign the result of _redact_approval_command(...) "
"(a discarded-result call would still leak the raw command)"
)
sink_line = None
for node in ast.walk(target_fn):
seg = ast.get_source_segment(source, node)
if seg and sink_substr in seg and getattr(node, "lineno", 0) > redact_line:
sink_line = node.lineno
break
assert sink_line is not None, (
f"`{sink_substr}` sink not found after the redaction in {func_name}"
)
def test_chat_platform_path_redacts_before_send(self):
import gateway.run as run
self._assert_redacts_then_uses(run, "_approval_notify_sync", "send_exec_approval")
def test_sse_api_path_redacts_before_enqueue(self):
from gateway.platforms import api_server
self._assert_redacts_then_uses(api_server, "_approval_notify", "put_nowait")
def test_tui_path_helper_redacts_before_emit(self):
# The TUI transport redacts in the module-level _emit_approval_request
# helper (which the three register_gateway_notify lambdas all call).
from tui_gateway import server as tui_server
self._assert_redacts_then_uses(tui_server, "_emit_approval_request", "_emit(")
class TestTuiApprovalEmitRedaction:
"""Behavior contract for the TUI approval transport (#48456, 3rd transport):
_emit_approval_request must emit a payload whose `command` is redacted, and
every register_gateway_notify lambda in tui_gateway must route through it
(not raw _emit) so the unredacted command never reaches the TUI client."""
def test_emit_approval_request_redacts_command_in_payload(self, monkeypatch):
from tui_gateway import server as tui_server
emitted = {}
monkeypatch.setattr(
tui_server, "_emit",
lambda event, sid, payload=None: emitted.update(
{"event": event, "sid": sid, "payload": payload}
),
)
raw = "curl -H 'Authorization: token ghp_01...6789' https://api.github.com"
tui_server._emit_approval_request("sess-1", {"command": raw, "description": "x"})
assert emitted["event"] == "approval.request"
assert "ghp_01...6789" not in emitted["payload"]["command"]
# non-command fields and command structure preserved
assert emitted["payload"]["description"] == "x"
assert "github.com" in emitted["payload"]["command"]
def test_no_raw_command_emit_in_approval_registrations(self):
"""No register_gateway_notify callback may emit the raw payload via
`_emit("approval.request", ...)` directly — all must go through the
redacting `_emit_approval_request` helper."""
import inspect
from tui_gateway import server as tui_server
src = inspect.getsource(tui_server)
# The ONLY allowed raw _emit("approval.request", ...) is inside the
# redacting helper itself; every other occurrence is a leak.
raw_emits = src.count('_emit("approval.request"')
# one inside _emit_approval_request; the lambdas use _emit_approval_request
assert raw_emits == 1, (
f"expected exactly 1 raw _emit(\"approval.request\") (inside the "
f"redacting helper), found {raw_emits} — a registration may be "
f"emitting the unredacted command"
)
assert "_emit_approval_request(sid, data)" in src, (
"registration lambdas must route through _emit_approval_request"
)

View File

@@ -806,6 +806,20 @@ def _emit(event: str, sid: str, payload: dict | None = None):
write_json({"jsonrpc": "2.0", "method": "event", "params": params})
def _emit_approval_request(sid: str, data: dict | None) -> None:
"""Emit an ``approval.request`` event to the TUI client with the command
redacted. The approval payload is built from the RAW command string, so a
credential-shaped value Tirith flagged would otherwise be echoed verbatim
to the TUI client (#48456 — third egress transport alongside the chat
platforms and the SSE/API stream). Reuse the shared gateway seam."""
payload = dict(data or {})
if "command" in payload:
from gateway.run import _redact_approval_command
payload["command"] = _redact_approval_command(payload.get("command"))
_emit("approval.request", sid, payload)
def _status_update(sid: str, kind: str, text: str | None = None):
body = (text if text is not None else kind).strip()
if not body:
@@ -1040,7 +1054,7 @@ def _start_agent_build(sid: str, session: dict) -> None:
)
register_gateway_notify(
key, lambda data: _emit("approval.request", sid, data)
key, lambda data: _emit_approval_request(sid, data)
)
notify_registered = True
load_permanent_allowlist()
@@ -2554,7 +2568,7 @@ def _sync_session_key_after_compress(
try:
register_gateway_notify(
new_session_id,
lambda data: _emit("approval.request", sid, data),
lambda data: _emit_approval_request(sid, data),
)
except Exception:
pass
@@ -3916,7 +3930,7 @@ def _init_session(
try:
from tools.approval import register_gateway_notify, load_permanent_allowlist
register_gateway_notify(key, lambda data: _emit("approval.request", sid, data))
register_gateway_notify(key, lambda data: _emit_approval_request(sid, data))
load_permanent_allowlist()
except Exception:
pass