diff --git a/cli.py b/cli.py index 48a71b16817..d76ce01bc32 100644 --- a/cli.py +++ b/cli.py @@ -1240,8 +1240,73 @@ def _cprint(text: str): Raw ANSI escapes written via print() are swallowed by patch_stdout's StdoutProxy. Routing through print_formatted_text(ANSI(...)) lets prompt_toolkit parse the escapes and render real colors. + + When called from a background thread while a prompt_toolkit + ``Application`` is running (the common case for the self-improvement + background review's ``๐Ÿ’พ โ€ฆ`` summary, curator summaries, and other + bg-thread emissions), a direct ``_pt_print`` races with the input + area's redraw and the line can end up visually buried behind the + prompt. Route those cases through ``run_in_terminal`` via + ``loop.call_soon_threadsafe``, which pauses the input area, prints + the line above it, and redraws the prompt cleanly. """ - _pt_print(_PT_ANSI(text)) + try: + from prompt_toolkit.application import get_app_or_none, run_in_terminal + except Exception: + _pt_print(_PT_ANSI(text)) + return + + app = None + try: + app = get_app_or_none() + except Exception: + app = None + + # No active app, or we're already on the app's main thread: the + # direct prompt_toolkit print is safe and matches existing behavior + # (spinner frames, streamed tokens, tool activity prefixes, โ€ฆ). + if app is None or not getattr(app, "_is_running", False): + _pt_print(_PT_ANSI(text)) + return + + try: + loop = app.loop # type: ignore[attr-defined] + except Exception: + loop = None + if loop is None: + _pt_print(_PT_ANSI(text)) + return + + import asyncio as _asyncio + try: + current_loop = _asyncio.get_event_loop_policy().get_event_loop() + except Exception: + current_loop = None + # Same thread as the app's loop โ†’ safe to print directly. + if current_loop is loop and loop.is_running(): + _pt_print(_PT_ANSI(text)) + return + + # Cross-thread emission: ask the app's event loop to schedule a + # ``run_in_terminal`` that wraps ``_pt_print``. This hides the + # prompt, prints, and redraws. Fire-and-forget โ€” if scheduling + # fails we fall back to a direct print so the line isn't lost. + def _schedule(): + try: + run_in_terminal(lambda: _pt_print(_PT_ANSI(text))) + except Exception: + try: + _pt_print(_PT_ANSI(text)) + except Exception: + pass + + try: + loop.call_soon_threadsafe(_schedule) + except Exception: + try: + _pt_print(_PT_ANSI(text)) + except Exception: + pass # --------------------------------------------------------------------------- diff --git a/run_agent.py b/run_agent.py index 0c64bfe50d0..c652ebfc98c 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3593,11 +3593,15 @@ class AIAgent: if actions: summary = " ยท ".join(dict.fromkeys(actions)) - self._safe_print(f" ๐Ÿ’พ {summary}") + self._safe_print( + f" ๐Ÿ’พ Self-improvement review: {summary}" + ) _bg_cb = self.background_review_callback if _bg_cb: try: - _bg_cb(f"๐Ÿ’พ {summary}") + _bg_cb( + f"๐Ÿ’พ Self-improvement review: {summary}" + ) except Exception: pass diff --git a/tests/cli/test_cprint_bg_thread.py b/tests/cli/test_cprint_bg_thread.py new file mode 100644 index 00000000000..3b5db534929 --- /dev/null +++ b/tests/cli/test_cprint_bg_thread.py @@ -0,0 +1,206 @@ +"""Tests for cli._cprint's bg-thread cooperation with prompt_toolkit. + +Background: when a prompt_toolkit Application is running, a bg thread that +calls ``_pt_print`` directly can race with the input-area redraw and the +printed line can end up visually buried behind the prompt. ``_cprint`` now +routes cross-thread prints through ``run_in_terminal`` via +``loop.call_soon_threadsafe`` so the self-improvement background review's +``๐Ÿ’พ Self-improvement review: โ€ฆ`` summary actually surfaces to the user. + +These tests verify the routing logic without spinning up a real PT app. +""" + +from __future__ import annotations + +import sys +import types +from types import SimpleNamespace + +import cli + + +def test_cprint_no_app_direct_print(monkeypatch): + """No active app โ†’ direct _pt_print, no run_in_terminal involvement.""" + calls = [] + monkeypatch.setattr(cli, "_pt_print", lambda x: calls.append(("pt_print", x))) + monkeypatch.setattr(cli, "_PT_ANSI", lambda t: ("ANSI", t)) + + # Patch the prompt_toolkit import the function performs internally. + fake_pt_app = types.ModuleType("prompt_toolkit.application") + fake_pt_app.get_app_or_none = lambda: None + fake_pt_app.run_in_terminal = lambda *a, **kw: calls.append(("run_in_terminal",)) + monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app) + + cli._cprint("hello") + + assert calls == [("pt_print", ("ANSI", "hello"))] + + +def test_cprint_app_not_running_direct_print(monkeypatch): + """App exists but not running (e.g. teardown) โ†’ direct print.""" + calls = [] + monkeypatch.setattr(cli, "_pt_print", lambda x: calls.append(("pt_print", x))) + monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t) + + fake_app = SimpleNamespace(_is_running=False, loop=None) + fake_pt_app = types.ModuleType("prompt_toolkit.application") + fake_pt_app.get_app_or_none = lambda: fake_app + fake_pt_app.run_in_terminal = lambda *a, **kw: calls.append(("run_in_terminal",)) + monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app) + + cli._cprint("x") + + assert calls == [("pt_print", "x")] + + +def test_cprint_bg_thread_schedules_on_app_loop(monkeypatch): + """App running + different thread โ†’ schedules via call_soon_threadsafe.""" + scheduled = [] + direct_prints = [] + + monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x)) + monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t) + + class FakeLoop: + def is_running(self): + return True + + def call_soon_threadsafe(self, cb, *args): + scheduled.append(cb) + + fake_loop = FakeLoop() + + # Install a fake "current loop" that is NOT the app's loop, so the + # cross-thread branch is taken. + fake_current_loop = SimpleNamespace(is_running=lambda: True) + fake_asyncio = types.ModuleType("asyncio") + + class _Policy: + def get_event_loop(self): + return fake_current_loop + + fake_asyncio.get_event_loop_policy = lambda: _Policy() + monkeypatch.setitem(sys.modules, "asyncio", fake_asyncio) + + fake_app = SimpleNamespace(_is_running=True, loop=fake_loop) + fake_pt_app = types.ModuleType("prompt_toolkit.application") + fake_pt_app.get_app_or_none = lambda: fake_app + + run_in_terminal_calls = [] + + def _fake_run_in_terminal(func, **kw): + run_in_terminal_calls.append(func) + # Simulate run_in_terminal actually calling func (as the real PT + # impl would once the app loop tick picks it up). + func() + return None + + fake_pt_app.run_in_terminal = _fake_run_in_terminal + monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app) + + cli._cprint("๐Ÿ’พ Self-improvement review: Skill updated") + + # call_soon_threadsafe must have been called with a scheduling cb. + assert len(scheduled) == 1 + + # Invoking the scheduled callback should hit run_in_terminal. + scheduled[0]() + assert len(run_in_terminal_calls) == 1 + + # And run_in_terminal's inner func should have emitted a pt_print. + assert direct_prints == ["๐Ÿ’พ Self-improvement review: Skill updated"] + + +def test_cprint_same_thread_as_app_loop_direct_print(monkeypatch): + """App running on same thread โ†’ direct print (no scheduling).""" + direct_prints = [] + monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x)) + monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t) + + class FakeLoop: + def is_running(self): + return True + + def call_soon_threadsafe(self, cb, *args): + raise AssertionError( + "call_soon_threadsafe must not be used on the app's own thread" + ) + + fake_loop = FakeLoop() + fake_asyncio = types.ModuleType("asyncio") + + class _Policy: + def get_event_loop(self): + return fake_loop # same as app loop + + fake_asyncio.get_event_loop_policy = lambda: _Policy() + monkeypatch.setitem(sys.modules, "asyncio", fake_asyncio) + + fake_app = SimpleNamespace(_is_running=True, loop=fake_loop) + fake_pt_app = types.ModuleType("prompt_toolkit.application") + fake_pt_app.get_app_or_none = lambda: fake_app + fake_pt_app.run_in_terminal = lambda *a, **kw: None + monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app) + + cli._cprint("x") + + assert direct_prints == ["x"] + + +def test_cprint_swallows_app_loop_attr_error(monkeypatch): + """Loop missing on app โ†’ fall back to direct print, no crash.""" + direct_prints = [] + monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x)) + monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t) + + class WeirdApp: + _is_running = True + + @property + def loop(self): + raise RuntimeError("no loop for you") + + fake_pt_app = types.ModuleType("prompt_toolkit.application") + fake_pt_app.get_app_or_none = lambda: WeirdApp() + fake_pt_app.run_in_terminal = lambda *a, **kw: None + monkeypatch.setitem(sys.modules, "prompt_toolkit.application", fake_pt_app) + + cli._cprint("fallback") + + assert direct_prints == ["fallback"] + + +def test_cprint_swallows_prompt_toolkit_import_error(monkeypatch): + """If prompt_toolkit.application itself fails to import, fall back.""" + direct_prints = [] + monkeypatch.setattr(cli, "_pt_print", lambda x: direct_prints.append(x)) + monkeypatch.setattr(cli, "_PT_ANSI", lambda t: t) + + # Drop cached prompt_toolkit.application AND install a meta-path finder + # that raises ImportError on re-import. + monkeypatch.delitem(sys.modules, "prompt_toolkit.application", raising=False) + + class _BlockFinder: + def find_module(self, name, path=None): + if name == "prompt_toolkit.application": + return self + return None + + def load_module(self, name): + raise ImportError("blocked for test") + + def find_spec(self, name, path=None, target=None): + if name == "prompt_toolkit.application": + # Returning a bogus spec that will fail on load works too, + # but raising here keeps the test simple. + raise ImportError("blocked for test") + return None + + blocker = _BlockFinder() + sys.meta_path.insert(0, blocker) + try: + cli._cprint("fallback2") + finally: + sys.meta_path.remove(blocker) + + assert direct_prints == ["fallback2"] diff --git a/tests/run_agent/test_background_review.py b/tests/run_agent/test_background_review.py index 2fc67414d34..8f2a61b7504 100644 --- a/tests/run_agent/test_background_review.py +++ b/tests/run_agent/test_background_review.py @@ -127,3 +127,66 @@ def test_background_review_installs_auto_deny_approval_callback(monkeypatch): "Background review leaked its approval callback into the worker " "thread's TLS slot; a recycled thread-id could reuse it." ) + + +def test_background_review_summary_is_attributed_to_self_improvement_loop(monkeypatch): + """The CLI/gateway emission must identify the self-improvement loop. + + Users who miss the line in their terminal have no way to tell that the + background review was what modified their skill/memory stores. The + summary prefix ``๐Ÿ’พ Self-improvement review: โ€ฆ`` makes the origin + explicit so both the CLI and gateway deliveries are unambiguous. + """ + import json + + captured_prints: list = [] + captured_bg_callback: list = [] + + class FakeReviewAgent: + def __init__(self, **kwargs): + # Simulate a review that successfully updated memory so + # _summarize_background_review_actions returns a real action. + self._session_messages = [ + { + "role": "tool", + "tool_call_id": "call_bg", + "content": json.dumps( + {"success": True, "message": "Entry added", "target": "memory"} + ), + } + ] + + def run_conversation(self, **kwargs): + pass + + def shutdown_memory_provider(self): + pass + + def close(self): + pass + + monkeypatch.setattr(run_agent_module, "AIAgent", FakeReviewAgent) + monkeypatch.setattr(run_agent_module.threading, "Thread", ImmediateThread) + + agent = _bare_agent() + agent._safe_print = lambda *a, **kw: captured_prints.append(" ".join(str(x) for x in a)) + agent.background_review_callback = lambda msg: captured_bg_callback.append(msg) + + AIAgent._spawn_background_review( + agent, + messages_snapshot=[{"role": "user", "content": "hi"}], + review_memory=True, + ) + + # Exactly one summary should have been emitted, and it must identify + # the self-improvement review explicitly. + assert len(captured_prints) == 1, captured_prints + printed = captured_prints[0] + assert "Self-improvement review" in printed, printed + assert "Memory updated" in printed, printed + + # Gateway path gets the same prefix. + assert len(captured_bg_callback) == 1 + assert captured_bg_callback[0].startswith("๐Ÿ’พ Self-improvement review:"), ( + captured_bg_callback[0] + )