diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 2107e62fd3..e6d96c802d 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -632,10 +632,26 @@ class GatewayStreamConsumer: visible_without_cursor = text if self.cfg.cursor: visible_without_cursor = visible_without_cursor.replace(self.cfg.cursor, "") - if not visible_without_cursor.strip(): + _visible_stripped = visible_without_cursor.strip() + if not _visible_stripped: return True # cursor-only / whitespace-only update if not text.strip(): return True # nothing to send is "success" + # Guard: do not create a brand-new standalone message when the only + # visible content is a handful of characters alongside the streaming + # cursor. During rapid tool-calling the model often emits 1-2 tokens + # before switching to tool calls; the resulting "X ▉" message risks + # leaving the cursor permanently visible if the follow-up edit (to + # strip the cursor on segment break) is rate-limited by the platform. + # This was reported on Telegram, Matrix, and other clients where the + # ▉ block character renders as a visible white box ("tofu"). + # Existing messages (edits) are unaffected — only first sends gated. + _MIN_NEW_MSG_CHARS = 4 + if (self._message_id is None + and self.cfg.cursor + and self.cfg.cursor in text + and len(_visible_stripped) < _MIN_NEW_MSG_CHARS): + return True # too short for a standalone message — accumulate more try: if self._message_id is not None: if self._edit_supported: diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index 38e536d760..38532e66be 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -155,6 +155,90 @@ class TestSendOrEditMediaStripping: adapter.send.assert_not_called() + @pytest.mark.asyncio + async def test_short_text_with_cursor_skips_new_message(self): + """Short text + cursor should not create a standalone new message. + + During rapid tool-calling the model often emits 1-2 tokens before + switching to tool calls. Sending 'I ▉' as a new message risks + leaving the cursor permanently visible if the follow-up edit is + rate-limited. The guard should skip the first send and let the + text accumulate into the next segment. + """ + adapter = MagicMock() + adapter.send = AsyncMock() + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(cursor=" ▉"), + ) + # No message_id yet (first send) — short text + cursor should be skipped + assert consumer._message_id is None + result = await consumer._send_or_edit("I ▉") + assert result is True + adapter.send.assert_not_called() + + # 3 chars is still under the threshold + result = await consumer._send_or_edit("Hi! ▉") + assert result is True + adapter.send.assert_not_called() + + @pytest.mark.asyncio + async def test_longer_text_with_cursor_sends_new_message(self): + """Text >= 4 visible chars + cursor should create a new message normally.""" + adapter = MagicMock() + send_result = SimpleNamespace(success=True, message_id="msg_1") + adapter.send = AsyncMock(return_value=send_result) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(cursor=" ▉"), + ) + result = await consumer._send_or_edit("Hello ▉") + assert result is True + adapter.send.assert_called_once() + + @pytest.mark.asyncio + async def test_short_text_without_cursor_sends_normally(self): + """Short text without cursor (e.g. final edit) should send normally.""" + adapter = MagicMock() + send_result = SimpleNamespace(success=True, message_id="msg_1") + adapter.send = AsyncMock(return_value=send_result) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(cursor=" ▉"), + ) + # No cursor in text — even short text should be sent + result = await consumer._send_or_edit("OK") + assert result is True + adapter.send.assert_called_once() + + @pytest.mark.asyncio + async def test_short_text_cursor_edit_existing_message_allowed(self): + """Short text + cursor editing an existing message should proceed.""" + adapter = MagicMock() + edit_result = SimpleNamespace(success=True) + adapter.edit_message = AsyncMock(return_value=edit_result) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(cursor=" ▉"), + ) + consumer._message_id = "msg_1" # Existing message — guard should not fire + consumer._last_sent_text = "" + result = await consumer._send_or_edit("I ▉") + assert result is True + adapter.edit_message.assert_called_once() + # ── Integration: full stream run ───────────────────────────────────────── @@ -507,7 +591,7 @@ class TestSegmentBreakOnToolBoundary: config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") consumer = GatewayStreamConsumer(adapter, "chat_123", config) - prefix = "abc" + prefix = "Hello world" tail = "x" * 620 consumer.on_delta(prefix) task = asyncio.create_task(consumer.run())