feat(telegram): send fresh finals for stale preview streams (port openclaw#72038) (#16261)

Ports openclaw/openclaw#72038 to hermes-agent.

Telegram's `editMessageText` preserves the original message timestamp,
so a long-running streamed reply (reasoning models that take 60+ seconds
to finish) would keep the first-token timestamp even after completion.
Users can't tell how long a task actually took.

When a preview message has been visible for >= 60s (configurable via
`streaming.fresh_final_after_seconds`), finalize by sending a fresh
message instead of editing in place, then best-effort delete the stale
preview. Short previews still edit in place (the existing fast path).

Implementation notes adapted from OpenClaw's TypeScript original:
- `StreamConsumerConfig` gains `fresh_final_after_seconds` (default 0 =
  legacy edit-in-place). Gateway-level `StreamingConfig` defaults to 60.
- `GatewayStreamConsumer` tracks `_message_created_ts` at first-send and
  checks it in `_send_or_edit` on `finalize=True`. New helpers
  `_should_send_fresh_final` + `_try_fresh_final`.
- `BasePlatformAdapter` gains optional `delete_message(chat_id, message_id)`
  returning False by default. `TelegramAdapter` implements it via
  `_bot.delete_message`.
- `gateway/run.py` only enables fresh-final for `Platform.TELEGRAM`;
  other platforms ignore the setting (they don't have the stale-edit
  timestamp problem or edit-then-read works cheaply).
- Fallback to normal edit on any fresh-send failure — no user-visible
  regression if Telegram rate-limits a send or the message is gone.

Tests: 15 new cases in tests/gateway/test_stream_consumer_fresh_final.py
covering short/long previews, config plumbing, delete-support absent,
send-failure fallback, __no_edit__ sentinel safety, and StreamingConfig
round-trip.

Co-authored-by: Hermes Agent <agent@nousresearch.com>
This commit is contained in:
Teknium
2026-04-26 17:26:37 -07:00
committed by GitHub
parent 755a280424
commit b16f9d438b
7 changed files with 427 additions and 0 deletions

View File

@@ -195,6 +195,14 @@ class StreamingConfig:
edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s)
buffer_threshold: int = 40 # Chars before forcing an edit
cursor: str = "" # Cursor shown during streaming
# Ported from openclaw/openclaw#72038. When >0, the final edit for
# a long-running streamed response is delivered as a fresh message
# if the original preview has been visible for at least this many
# seconds, so the platform's visible timestamp reflects completion
# time instead of the preview creation time. Currently applied to
# Telegram only (other platforms ignore the setting). Default 60s
# matches the OpenClaw rollout. Set to 0 to disable.
fresh_final_after_seconds: float = 60.0
def to_dict(self) -> Dict[str, Any]:
return {
@@ -203,6 +211,7 @@ class StreamingConfig:
"edit_interval": self.edit_interval,
"buffer_threshold": self.buffer_threshold,
"cursor": self.cursor,
"fresh_final_after_seconds": self.fresh_final_after_seconds,
}
@classmethod
@@ -215,6 +224,9 @@ class StreamingConfig:
edit_interval=float(data.get("edit_interval", 1.0)),
buffer_threshold=int(data.get("buffer_threshold", 40)),
cursor=data.get("cursor", ""),
fresh_final_after_seconds=float(
data.get("fresh_final_after_seconds", 60.0)
),
)

View File

@@ -1258,6 +1258,27 @@ class BasePlatformAdapter(ABC):
"""
return SendResult(success=False, error="Not supported")
async def delete_message(
self,
chat_id: str,
message_id: str,
) -> bool:
"""
Delete a previously sent message. Optional — platforms that don't
support deletion return ``False`` and callers fall back to leaving
the message in place.
Used by the stream consumer's fresh-final cleanup path (see
openclaw/openclaw#72038) to remove long-lived preview messages
after sending the completed reply as a fresh message so the
platform's visible timestamp reflects completion time.
Returns ``True`` on successful deletion, ``False`` otherwise.
Subclasses should override for platforms with a deletion API
(e.g. Telegram ``deleteMessage``).
"""
return False
async def send_typing(self, chat_id: str, metadata=None) -> None:
"""
Send a typing indicator.

View File

@@ -1209,6 +1209,31 @@ class TelegramAdapter(BasePlatformAdapter):
)
return SendResult(success=False, error=str(e))
async def delete_message(self, chat_id: str, message_id: str) -> bool:
"""Delete a previously sent Telegram message.
Used by the stream consumer's fresh-final cleanup path (ported
from openclaw/openclaw#72038) to remove long-lived preview
messages after sending the completed reply as a fresh message.
Telegram's Bot API ``deleteMessage`` works for bot-posted
messages in the last 48 hours. Failures are non-fatal — the
caller leaves the preview in place and logs at debug level.
"""
if not self._bot:
return False
try:
await self._bot.delete_message(
chat_id=int(chat_id),
message_id=int(message_id),
)
return True
except Exception as e:
logger.debug(
"[%s] Failed to delete Telegram message %s: %s",
self.name, message_id, e,
)
return False
async def send_update_prompt(
self, chat_id: str, prompt: str, default: str = "",
session_key: str = "",

View File

@@ -9154,11 +9154,21 @@ class GatewayRunner:
if source.platform == Platform.MATRIX:
_effective_cursor = ""
_buffer_only = True
# Fresh-final applies to Telegram only — other
# platforms either edit in place cheaply (Discord,
# Slack) or don't have the timestamp-on-edit
# problem. (Ported from openclaw/openclaw#72038.)
_fresh_final_secs = (
float(getattr(_scfg, "fresh_final_after_seconds", 0.0) or 0.0)
if source.platform == Platform.TELEGRAM
else 0.0
)
_consumer_cfg = StreamConsumerConfig(
edit_interval=_scfg.edit_interval,
buffer_threshold=_scfg.buffer_threshold,
cursor=_effective_cursor,
buffer_only=_buffer_only,
fresh_final_after_seconds=_fresh_final_secs,
)
_stream_consumer = GatewayStreamConsumer(
adapter=_adapter,
@@ -9842,11 +9852,21 @@ class GatewayRunner:
if source.platform == Platform.MATRIX:
_effective_cursor = ""
_buffer_only = True
# Fresh-final applies to Telegram only — other
# platforms either edit in place cheaply or don't
# have the edit-timestamp-stays-stale problem.
# (Ported from openclaw/openclaw#72038.)
_fresh_final_secs = (
float(getattr(_scfg, "fresh_final_after_seconds", 0.0) or 0.0)
if source.platform == Platform.TELEGRAM
else 0.0
)
_consumer_cfg = StreamConsumerConfig(
edit_interval=_scfg.edit_interval,
buffer_threshold=_scfg.buffer_threshold,
cursor=_effective_cursor,
buffer_only=_buffer_only,
fresh_final_after_seconds=_fresh_final_secs,
)
_stream_consumer = GatewayStreamConsumer(
adapter=_adapter,

View File

@@ -44,6 +44,14 @@ class StreamConsumerConfig:
buffer_threshold: int = 40
cursor: str = ""
buffer_only: bool = False
# When >0, the final edit for a streamed response is delivered as a
# fresh message if the original preview has been visible for at least
# this many seconds. This makes the platform's visible timestamp
# reflect completion time instead of first-token time for long-running
# responses (e.g. reasoning models that stream slowly). Ported from
# openclaw/openclaw#72038. Default 0 = always edit in place (legacy
# behavior). The gateway enables this selectively per-platform.
fresh_final_after_seconds: float = 0.0
class GatewayStreamConsumer:
@@ -91,6 +99,12 @@ class GatewayStreamConsumer:
self._queue: queue.Queue = queue.Queue()
self._accumulated = ""
self._message_id: Optional[str] = None
# Wall-clock timestamp (time.monotonic) when ``_message_id`` was
# first assigned from a successful first-send. Used by the
# fresh-final logic to detect long-lived previews whose edit
# timestamps would be stale by completion time. Ported from
# openclaw/openclaw#72038.
self._message_created_ts: Optional[float] = None
self._already_sent = False
self._edit_supported = True # Disabled when progressive edits are no longer usable
self._last_edit_time = 0.0
@@ -136,6 +150,7 @@ class GatewayStreamConsumer:
if preserve_no_edit and self._message_id == "__no_edit__":
return
self._message_id = None
self._message_created_ts = None
self._accumulated = ""
self._last_sent_text = ""
self._fallback_final_send = False
@@ -734,6 +749,81 @@ class GatewayStreamConsumer:
logger.error("Commentary send error: %s", e)
return False
def _should_send_fresh_final(self) -> bool:
"""Return True when a long-lived preview should be replaced with a
fresh final message instead of an edit.
Conditions:
- Fresh-final is enabled (``fresh_final_after_seconds > 0``).
- We have a real preview message id (not the ``__no_edit__`` sentinel
and not ``None``).
- The preview has been visible for at least the configured threshold.
Ported from openclaw/openclaw#72038.
"""
threshold = getattr(self.cfg, "fresh_final_after_seconds", 0.0) or 0.0
if threshold <= 0:
return False
if not self._message_id or self._message_id == "__no_edit__":
return False
if self._message_created_ts is None:
return False
age = time.monotonic() - self._message_created_ts
return age >= threshold
async def _try_fresh_final(self, text: str) -> bool:
"""Send ``text`` as a brand-new message (best-effort delete the old
preview) so the platform's visible timestamp reflects completion
time. Returns True on successful delivery, False on any failure so
the caller falls back to the normal edit path.
Ported from openclaw/openclaw#72038.
"""
old_message_id = self._message_id
try:
result = await self.adapter.send(
chat_id=self.chat_id,
content=text,
metadata=self.metadata,
)
except Exception as e:
logger.debug("Fresh-final send failed, falling back to edit: %s", e)
return False
if not getattr(result, "success", False):
return False
# Successful fresh send — try to delete the stale preview so the
# user doesn't see the old edit-stuck message underneath. Cleanup
# is best-effort; platforms that don't implement ``delete_message``
# just leave the preview behind (still an acceptable outcome —
# the visible final timestamp is the important part).
if old_message_id and old_message_id != "__no_edit__":
delete_fn = getattr(self.adapter, "delete_message", None)
if delete_fn is not None:
try:
await delete_fn(self.chat_id, old_message_id)
except Exception as e:
logger.debug(
"Fresh-final preview cleanup failed (%s): %s",
old_message_id, e,
)
# Adopt the new message id as the current message so subsequent
# callers (e.g. overflow split loops, finalize retries) see a
# consistent state.
new_message_id = getattr(result, "message_id", None)
if new_message_id:
self._message_id = new_message_id
self._message_created_ts = time.monotonic()
else:
# Send succeeded but platform didn't return an id — treat the
# delivery as final-only and fall back to "__no_edit__" so we
# don't try to edit something we can't address.
self._message_id = "__no_edit__"
self._message_created_ts = None
self._already_sent = True
self._last_sent_text = text
self._final_response_sent = True
return True
async def _send_or_edit(self, text: str, *, finalize: bool = False) -> bool:
"""Send or edit the streaming message.
@@ -786,6 +876,22 @@ class GatewayStreamConsumer:
finalize and self._adapter_requires_finalize
):
return True
# Fresh-final for long-lived previews: when finalizing
# the last edit in a streaming sequence, if the
# original preview has been visible for at least
# ``fresh_final_after_seconds``, send the completed
# reply as a fresh message so the platform's visible
# timestamp reflects completion time instead of the
# preview creation time. Best-effort cleanup of the
# old preview follows. Ported from
# openclaw/openclaw#72038. Gated by config so the
# legacy edit-in-place path stays the default.
if (
finalize
and self._should_send_fresh_final()
and await self._try_fresh_final(text)
):
return True
# Edit existing message
result = await self.adapter.edit_message(
chat_id=self.chat_id,
@@ -852,6 +958,10 @@ class GatewayStreamConsumer:
if result.success:
if result.message_id:
self._message_id = result.message_id
# Track when the preview first became visible to
# the user so fresh-final logic can detect stale
# preview timestamps on long-running responses.
self._message_created_ts = time.monotonic()
else:
self._edit_supported = False
self._already_sent = True

View File

@@ -0,0 +1,236 @@
"""Regression tests for the fresh-final-for-long-lived-previews path.
Ported from openclaw/openclaw#72038. When a streamed preview has been
visible long enough that the platform's edit timestamp would be
noticeably stale by completion time, the stream consumer delivers the
final reply as a brand-new message and best-effort deletes the old
preview. This makes Telegram's visible timestamp reflect completion
time instead of first-token time.
"""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
def _make_adapter(*, supports_delete: bool = True) -> MagicMock:
"""Build a minimal MagicMock adapter wired for send/edit/delete."""
adapter = MagicMock()
adapter.REQUIRES_EDIT_FINALIZE = False
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(return_value=SimpleNamespace(
success=True, message_id="initial_preview",
))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(
success=True, message_id="initial_preview",
))
if supports_delete:
adapter.delete_message = AsyncMock(return_value=True)
else:
# Adapter without the optional delete_message method — fresh-final
# should still work, it just leaves the stale preview in place.
del adapter.delete_message # type: ignore[attr-defined]
return adapter
class TestFreshFinalForLongLivedPreviews:
"""openclaw#72038 port — send fresh final when preview is old."""
@pytest.mark.asyncio
async def test_disabled_by_default_still_edits_in_place(self):
"""``fresh_final_after_seconds=0`` preserves the legacy edit path."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(fresh_final_after_seconds=0.0),
)
await consumer._send_or_edit("hello")
# Pretend the preview has been visible for a long time.
consumer._message_created_ts = 0.0 # far in the past
await consumer._send_or_edit("hello world", finalize=True)
# Should edit, not send a fresh message.
assert adapter.send.call_count == 1 # only the initial send
adapter.edit_message.assert_called_once()
@pytest.mark.asyncio
async def test_short_lived_preview_edits_in_place(self):
"""Finalizing a preview younger than the threshold → normal edit."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
)
await consumer._send_or_edit("hello")
# Preview is "new" — leave _message_created_ts at its real value.
await consumer._send_or_edit("hello world", finalize=True)
assert adapter.send.call_count == 1
adapter.edit_message.assert_called_once()
@pytest.mark.asyncio
async def test_long_lived_preview_sends_fresh_final(self):
"""Finalizing a preview older than the threshold → fresh send."""
adapter = _make_adapter()
adapter.send.side_effect = [
SimpleNamespace(success=True, message_id="initial_preview"),
SimpleNamespace(success=True, message_id="fresh_final"),
]
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
)
await consumer._send_or_edit("hello")
# Force the preview to look stale (visible for > 60s).
consumer._message_created_ts = 0.0 # zero = ~uptime seconds old
await consumer._send_or_edit("hello world", finalize=True)
# Fresh send happened; no edit of the old preview.
assert adapter.send.call_count == 2
adapter.edit_message.assert_not_called()
# The old preview was deleted as cleanup.
adapter.delete_message.assert_awaited_once_with("chat", "initial_preview")
# State was updated to the new message id.
assert consumer._message_id == "fresh_final"
assert consumer._final_response_sent is True
@pytest.mark.asyncio
async def test_fresh_final_without_delete_support_is_best_effort(self):
"""Adapter lacking ``delete_message`` still gets the fresh send."""
adapter = _make_adapter(supports_delete=False)
adapter.send.side_effect = [
SimpleNamespace(success=True, message_id="initial_preview"),
SimpleNamespace(success=True, message_id="fresh_final"),
]
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
)
await consumer._send_or_edit("hello")
consumer._message_created_ts = 0.0
await consumer._send_or_edit("hello world", finalize=True)
assert adapter.send.call_count == 2
adapter.edit_message.assert_not_called()
# No delete attempt — just the fresh send.
assert consumer._message_id == "fresh_final"
@pytest.mark.asyncio
async def test_fresh_final_fallback_to_edit_on_send_failure(self):
"""If the fresh send fails, fall back to the normal edit path."""
adapter = _make_adapter()
adapter.send.side_effect = [
SimpleNamespace(success=True, message_id="initial_preview"),
SimpleNamespace(success=False, error="network"),
]
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
)
await consumer._send_or_edit("hello")
consumer._message_created_ts = 0.0
ok = await consumer._send_or_edit("hello world", finalize=True)
# Fresh send was attempted and failed → edit happened instead.
assert adapter.send.call_count == 2
adapter.edit_message.assert_called_once()
assert ok is True
@pytest.mark.asyncio
async def test_only_finalize_triggers_fresh_final(self):
"""Intermediate edits (``finalize=False``) never switch to fresh send."""
adapter = _make_adapter()
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
)
await consumer._send_or_edit("hello")
consumer._message_created_ts = 0.0 # stale
await consumer._send_or_edit("hello partial") # no finalize
assert adapter.send.call_count == 1
adapter.edit_message.assert_called_once()
@pytest.mark.asyncio
async def test_no_edit_sentinel_is_not_affected(self):
"""Platforms with the ``__no_edit__`` sentinel never go fresh-final."""
adapter = _make_adapter()
adapter.send.return_value = SimpleNamespace(success=True, message_id=None)
consumer = GatewayStreamConsumer(
adapter=adapter,
chat_id="chat",
config=StreamConsumerConfig(fresh_final_after_seconds=60.0),
)
await consumer._send_or_edit("hello")
assert consumer._message_id == "__no_edit__"
assert consumer._message_created_ts is None
# Even with finalize=True, no fresh send — the sentinel gates it.
assert consumer._should_send_fresh_final() is False
class TestStreamConsumerConfigFreshFinalField:
"""The dataclass field must exist and default to 0 (disabled)."""
def test_default_is_disabled(self):
cfg = StreamConsumerConfig()
assert cfg.fresh_final_after_seconds == 0.0
def test_field_is_configurable(self):
cfg = StreamConsumerConfig(fresh_final_after_seconds=120.0)
assert cfg.fresh_final_after_seconds == 120.0
class TestStreamingConfigFreshFinalField:
"""The gateway-level StreamingConfig carries the setting."""
def test_default_enables_with_60s(self):
from gateway.config import StreamingConfig
cfg = StreamingConfig()
assert cfg.fresh_final_after_seconds == 60.0
def test_from_dict_uses_default_when_missing(self):
from gateway.config import StreamingConfig
cfg = StreamingConfig.from_dict({"enabled": True})
assert cfg.fresh_final_after_seconds == 60.0
def test_from_dict_respects_explicit_zero(self):
from gateway.config import StreamingConfig
cfg = StreamingConfig.from_dict({
"enabled": True,
"fresh_final_after_seconds": 0,
})
assert cfg.fresh_final_after_seconds == 0.0
def test_to_dict_round_trip(self):
from gateway.config import StreamingConfig
original = StreamingConfig(fresh_final_after_seconds=90.0)
restored = StreamingConfig.from_dict(original.to_dict())
assert restored.fresh_final_after_seconds == 90.0
class TestTelegramAdapterDeleteMessage:
"""Contract: Telegram adapter implements ``delete_message``."""
def test_delete_message_method_exists(self):
telegram = pytest.importorskip("gateway.platforms.telegram")
import inspect
cls = telegram.TelegramAdapter
assert hasattr(cls, "delete_message"), (
"TelegramAdapter.delete_message is required for the fresh-final "
"cleanup path (openclaw/openclaw#72038 port)."
)
sig = inspect.signature(cls.delete_message)
params = list(sig.parameters)
assert params[:3] == ["self", "chat_id", "message_id"]
def test_base_adapter_default_returns_false(self):
"""BasePlatformAdapter.delete_message default = no-op returning False."""
from gateway.platforms.base import BasePlatformAdapter
import inspect
sig = inspect.signature(BasePlatformAdapter.delete_message)
assert list(sig.parameters)[:3] == ["self", "chat_id", "message_id"]

View File

@@ -1114,6 +1114,7 @@ streaming:
edit_interval: 0.3 # Seconds between message edits
buffer_threshold: 40 # Characters before forcing an edit flush
cursor: " ▉" # Cursor shown during streaming
fresh_final_after_seconds: 60 # Send fresh final (Telegram) when preview is this old; 0 = always edit in place
```
When enabled, the bot sends a message on the first token, then progressively edits it as more tokens arrive. Platforms that don't support message editing (Signal, Email, Home Assistant) are auto-detected on the first attempt — streaming is gracefully disabled for that session with no flood of messages.
@@ -1122,6 +1123,8 @@ For separate natural mid-turn assistant updates without progressive token editin
**Overflow handling:** If the streamed text exceeds the platform's message length limit (~4096 chars), the current message is finalized and a new one starts automatically.
**Fresh final (Telegram):** Telegram's `editMessageText` preserves the original message timestamp, so a long-running streamed reply would keep the first-token timestamp even after completion. When `fresh_final_after_seconds > 0` (default `60`), the completed reply is delivered as a brand-new message (with the stale preview best-effort deleted) so Telegram's visible timestamp reflects completion time. Short previews still finalize in place. Set to `0` to always edit in place.
:::note
Streaming is disabled by default. Enable it in `~/.hermes/config.yaml` to try the streaming UX.
:::