fix(matrix): E2EE and migration bugfixes (#10860)

* - make buffered streaming
- fix path naming to expand `~` for agent.
- fix stripping of matrix ID to not remove other mentions / localports.

* fix(matrix): register MembershipEventDispatcher for invite auto-join

The mautrix migration (#7518) broke auto-join because InternalEventType.INVITE
events are only dispatched when MembershipEventDispatcher is registered on the
client. Without it, _on_invite is dead code and the bot silently ignores all
room invites.

Closes #10094
Closes #10725
Refs: PR #10135 (digging-airfare-4u), PR #10732 (fxfitz)

* fix(matrix): preserve _joined_rooms reference for CryptoStateStore

connect() reassigned self._joined_rooms = set(...) after initial sync,
orphaning the reference captured by _CryptoStateStore at init time.
find_shared_rooms() returned [] forever, breaking Megolm session rotation
on membership changes.

Mutate in place with clear() + update() so the CryptoStateStore reference
stays valid.

Refs #8174, PR #8215

* fix(matrix): remove dual ROOM_ENCRYPTED handler to fix dedup race

mautrix auto-registers DecryptionDispatcher when client.crypto is set.
The adapter also registered _on_encrypted_event for the same event type.
_on_encrypted_event had zero awaits and won the race to mark event IDs
in the dedup set, causing _on_room_message to drop successfully decrypted
events from DecryptionDispatcher. The retry loop masked this by re-decrypting
every message ~4 seconds later.

Remove _on_encrypted_event entirely. DecryptionDispatcher handles decryption;
genuinely undecryptable events are logged by mautrix and retried on next
key exchange.

Refs #8174, PR #8215

* fix(matrix): re-verify device keys after share_keys() upload

Matrix homeservers treat ed25519 identity keys as immutable per device.
share_keys() can return 200 but silently ignore new keys if the device
already exists with different identity keys. The bot would proceed with
shared=True while peers encrypt to the old (unreachable) keys.

Now re-queries the server after share_keys() and fails closed if keys
don't match, with an actionable error message.

Refs #8174, PR #8215

* fix(matrix): encrypt outbound attachments in E2EE rooms

_upload_and_send() uploaded raw bytes and used the 'url' key for all
rooms. In E2EE rooms, media must be encrypted client-side with
encrypt_attachment(), the ciphertext uploaded, and the 'file' key
(with key/iv/hashes) used instead of 'url'.

Now detects encrypted rooms via state_store.is_encrypted() and
branches to the encrypted upload path.

Refs: PR #9822 (charles-brooks)

* fix(matrix): add stop_typing to clear typing indicator after response

The adapter set a 30-second typing timeout but never cleared it.
The base class stop_typing() is a no-op, so the typing indicator
lingered for up to 30 seconds after each response.

Closes #6016
Refs: PR #6020 (r266-tech)

* fix(matrix): cache all media types locally, not just photos/voice

should_cache_locally only covered PHOTO, VOICE, and encrypted media.
Unencrypted audio/video/documents in plaintext rooms were passed as MXC
URLs that require authentication the agent doesn't have, resulting
in 401 errors.

Refs #3487, #3806

* fix(matrix): detect stale OTK conflict on startup and fail closed

When crypto state is wiped but the same device ID is reused, the
homeserver may still hold one-time keys signed with the previous
identity key. Identity key re-upload succeeds but OTK uploads fail
with "already exists" and a signature mismatch. Peers cannot
establish new Olm sessions, so all new messages are undecryptable.

Now proactively flushes OTKs via share_keys() during connect() and
catches the "already exists" error with an actionable log message
telling the operator to purge the device from the homeserver or
generate a fresh device ID.

Also documents the crypto store recovery procedure in the Matrix
setup guide.

Refs #8174

* docs(matrix): improve crypto recovery docs per review

- Put easy path (fresh access token) first, manual purge second
- URL-encode user ID in Synapse admin API example
- Note that device deletion may invalidate the access token
- Add "stop Synapse first" caveat for direct SQLite approach
- Mention the fail-closed startup detection behavior
- Add back-reference from upgrade section to OTK warning

* refactor(matrix): cleanup from code review

- Extract _extract_server_ed25519() and _reverify_keys_after_upload()
  to deduplicate the re-verification block (was copy-pasted in two
  places, three copies of ed25519 key extraction total)
- Remove dead code: _pending_megolm, _retry_pending_decryptions,
  _MAX_PENDING_EVENTS, _PENDING_EVENT_TTL — all orphaned after
  removing _on_encrypted_event
- Remove tautological TestMediaCacheGate (tested its own predicate,
  not production code)
- Remove dead TestMatrixMegolmEventHandling and
  TestMatrixRetryPendingDecryptions (tested removed methods)
- Merge duplicate TestMatrixStopTyping into TestMatrixTypingIndicator
- Trim comment to just the "why"
This commit is contained in:
Siddharth Balyan
2026-04-16 15:33:02 -07:00
committed by GitHub
parent 387aa9afc9
commit d38b73fa57
8 changed files with 833 additions and 358 deletions

View File

@@ -1291,7 +1291,7 @@ class BasePlatformAdapter(ABC):
path = path[1:-1].strip()
path = path.lstrip("`\"'").rstrip("`\"',.;:)}]")
if path:
media.append((path, has_voice_tag))
media.append((os.path.expanduser(path), has_voice_tag))
# Remove MEDIA tags from content (including surrounding quote/backtick wrappers)
if media:

File diff suppressed because it is too large Load Diff

View File

@@ -7982,12 +7982,15 @@ class GatewayRunner:
if _adapter:
_adapter_supports_edit = getattr(_adapter, "SUPPORTS_MESSAGE_EDITING", True)
_effective_cursor = _scfg.cursor if _adapter_supports_edit else ""
_buffer_only = False
if source.platform == Platform.MATRIX:
_effective_cursor = ""
_buffer_only = True
_consumer_cfg = StreamConsumerConfig(
edit_interval=_scfg.edit_interval,
buffer_threshold=_scfg.buffer_threshold,
cursor=_effective_cursor,
buffer_only=_buffer_only,
)
_stream_consumer = GatewayStreamConsumer(
adapter=_adapter,
@@ -8553,12 +8556,15 @@ class GatewayRunner:
# Some Matrix clients render the streaming cursor
# as a visible tofu/white-box artifact. Keep
# streaming text on Matrix, but suppress the cursor.
_buffer_only = False
if source.platform == Platform.MATRIX:
_effective_cursor = ""
_buffer_only = True
_consumer_cfg = StreamConsumerConfig(
edit_interval=_scfg.edit_interval,
buffer_threshold=_scfg.buffer_threshold,
cursor=_effective_cursor,
buffer_only=_buffer_only,
)
_stream_consumer = GatewayStreamConsumer(
adapter=_adapter,

View File

@@ -43,6 +43,7 @@ class StreamConsumerConfig:
edit_interval: float = 1.0
buffer_threshold: int = 40
cursor: str = ""
buffer_only: bool = False
class GatewayStreamConsumer:
@@ -295,10 +296,13 @@ class GatewayStreamConsumer:
got_done
or got_segment_break
or commentary_text is not None
or (elapsed >= self._current_edit_interval
and self._accumulated)
or len(self._accumulated) >= self.cfg.buffer_threshold
)
if not self.cfg.buffer_only:
should_edit = should_edit or (
(elapsed >= self._current_edit_interval
and self._accumulated)
or len(self._accumulated) >= self.cfg.buffer_threshold
)
current_update_visible = False
if should_edit and self._accumulated:

View File

@@ -108,6 +108,9 @@ def _make_fake_mautrix():
def add_event_handler(self, event_type, handler):
self._event_handlers.setdefault(event_type, []).append(handler)
def add_dispatcher(self, dispatcher_type):
pass
class InternalEventType:
INVITE = "internal.invite"
@@ -115,6 +118,14 @@ def _make_fake_mautrix():
mautrix_client.InternalEventType = InternalEventType
mautrix.client = mautrix_client
# --- mautrix.client.dispatcher ---
mautrix_client_dispatcher = types.ModuleType("mautrix.client.dispatcher")
class MembershipEventDispatcher:
pass
mautrix_client_dispatcher.MembershipEventDispatcher = MembershipEventDispatcher
# --- mautrix.client.state_store ---
mautrix_client_state_store = types.ModuleType("mautrix.client.state_store")
@@ -163,6 +174,19 @@ def _make_fake_mautrix():
mautrix_crypto_store.MemoryCryptoStore = MemoryCryptoStore
# --- mautrix.crypto.attachments ---
mautrix_crypto_attachments = types.ModuleType("mautrix.crypto.attachments")
def encrypt_attachment(data):
encrypted_file = MagicMock()
encrypted_file.serialize.return_value = {
"key": {"k": "testkey"}, "iv": "testiv",
"hashes": {"sha256": "testhash"}, "v": "v2",
}
return (b"ciphertext_" + data, encrypted_file)
mautrix_crypto_attachments.encrypt_attachment = encrypt_attachment
# --- mautrix.crypto.store.asyncpg ---
mautrix_crypto_store_asyncpg = types.ModuleType("mautrix.crypto.store.asyncpg")
@@ -200,8 +224,10 @@ def _make_fake_mautrix():
"mautrix.api": mautrix_api,
"mautrix.types": mautrix_types,
"mautrix.client": mautrix_client,
"mautrix.client.dispatcher": mautrix_client_dispatcher,
"mautrix.client.state_store": mautrix_client_state_store,
"mautrix.crypto": mautrix_crypto,
"mautrix.crypto.attachments": mautrix_crypto_attachments,
"mautrix.crypto.store": mautrix_crypto_store,
"mautrix.crypto.store.asyncpg": mautrix_crypto_store_asyncpg,
"mautrix.util": mautrix_util,
@@ -357,6 +383,16 @@ class TestMatrixTypingIndicator:
timeout=0,
)
@pytest.mark.asyncio
async def test_stop_typing_no_client_is_noop(self):
self.adapter._client = None
await self.adapter.stop_typing("!room:example.org") # should not raise
@pytest.mark.asyncio
async def test_stop_typing_suppresses_exceptions(self):
self.adapter._client.set_typing = AsyncMock(side_effect=Exception("network"))
await self.adapter.stop_typing("!room:example.org") # should not raise
# ---------------------------------------------------------------------------
# mxc:// URL conversion
@@ -835,6 +871,41 @@ class TestMatrixAccessTokenAuth:
await adapter.disconnect()
class TestDeviceKeyReVerification:
@pytest.mark.asyncio
async def test_verify_fails_when_server_keys_mismatch_after_upload(self):
"""share_keys() succeeds but server still has old keys -> should return False."""
adapter = _make_adapter()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = "TESTDEVICE"
# First query: keys missing -> triggers share_keys
# Second query: keys still don't match -> should fail
mock_keys_missing = MagicMock()
mock_keys_missing.device_keys = {"@bot:example.org": {}}
mock_keys_mismatch = MagicMock()
mock_device = MagicMock()
mock_device.keys = {"ed25519:TESTDEVICE": "server_old_key"}
mock_keys_mismatch.device_keys = {"@bot:example.org": {"TESTDEVICE": mock_device}}
mock_client.query_keys = AsyncMock(side_effect=[mock_keys_missing, mock_keys_mismatch])
mock_olm = MagicMock()
mock_olm.account = MagicMock()
mock_olm.account.shared = False
mock_olm.account.identity_keys = {"ed25519": "local_new_key"}
mock_olm.share_keys = AsyncMock()
from gateway.platforms.matrix import MatrixAdapter
result = await adapter._verify_device_keys_on_server(mock_client, mock_olm)
assert result is False
mock_olm.share_keys.assert_awaited_once()
class TestMatrixE2EEHardFail:
"""connect() must refuse to start when E2EE is requested but deps are missing."""
@@ -1139,6 +1210,56 @@ class TestMatrixSyncLoop:
mock_sync_store.put_next_batch.assert_awaited_once_with("s1234")
class TestMatrixUploadAndSend:
@pytest.mark.asyncio
async def test_upload_unencrypted_room_uses_plain_url(self):
"""Unencrypted rooms should use plain 'url' key."""
adapter = _make_adapter()
adapter._encryption = True
mock_client = MagicMock()
mock_client.crypto = object()
mock_client.state_store = MagicMock()
mock_client.state_store.is_encrypted = AsyncMock(return_value=False)
mock_client.upload_media = AsyncMock(return_value="mxc://example.org/plain")
mock_client.send_message_event = AsyncMock(return_value="$event")
adapter._client = mock_client
result = await adapter._upload_and_send(
"!room:example.org", b"hello", "test.txt", "text/plain", "m.file",
)
assert result.success is True
sent = mock_client.send_message_event.await_args.args[2]
assert sent["url"] == "mxc://example.org/plain"
assert "file" not in sent
@pytest.mark.asyncio
async def test_upload_encrypted_room_uses_file_payload(self):
"""Encrypted rooms should use 'file' key with crypto metadata."""
adapter = _make_adapter()
adapter._encryption = True
mock_client = MagicMock()
mock_client.crypto = object()
mock_client.state_store = MagicMock()
mock_client.state_store.is_encrypted = AsyncMock(return_value=True)
mock_client.upload_media = AsyncMock(return_value="mxc://example.org/enc")
mock_client.send_message_event = AsyncMock(return_value="$event")
adapter._client = mock_client
result = await adapter._upload_and_send(
"!room:example.org", b"secret", "secret.txt", "text/plain", "m.file",
)
assert result.success is True
# Should have uploaded ciphertext, not plaintext
uploaded_data = mock_client.upload_media.await_args.args[0]
assert uploaded_data != b"secret"
sent = mock_client.send_message_event.await_args.args[2]
assert "url" not in sent
assert "file" in sent
assert sent["file"]["url"] == "mxc://example.org/enc"
class TestMatrixEncryptedSendFallback:
@pytest.mark.asyncio
async def test_send_retries_after_e2ee_error(self):
@@ -1165,128 +1286,24 @@ class TestMatrixEncryptedSendFallback:
# ---------------------------------------------------------------------------
# E2EE: MegolmEvent key request + buffering via _on_encrypted_event
# E2EE: _joined_rooms reference preservation for CryptoStateStore
# ---------------------------------------------------------------------------
class TestMatrixMegolmEventHandling:
@pytest.mark.asyncio
async def test_encrypted_event_buffers_for_retry(self):
"""_on_encrypted_event should buffer undecrypted events for retry."""
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
adapter._startup_ts = 0.0
adapter._dm_rooms = {}
class TestJoinedRoomsReference:
def test_joined_rooms_reference_preserved_after_reassignment(self):
"""_CryptoStateStore must see updates after initial sync populates rooms."""
from gateway.platforms.matrix import _CryptoStateStore
fake_event = MagicMock()
fake_event.room_id = "!room:example.org"
fake_event.event_id = "$encrypted_event"
fake_event.sender = "@alice:example.org"
joined = set()
store = _CryptoStateStore(MagicMock(), joined)
await adapter._on_encrypted_event(fake_event)
# Simulate what connect() should do: mutate in place, not reassign.
joined.clear()
joined.update(["!room1:example.org", "!room2:example.org"])
# Should have buffered the event
assert len(adapter._pending_megolm) == 1
room_id, event, ts = adapter._pending_megolm[0]
assert room_id == "!room:example.org"
assert event is fake_event
@pytest.mark.asyncio
async def test_encrypted_event_buffer_capped(self):
"""Buffer should not grow past _MAX_PENDING_EVENTS."""
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
adapter._startup_ts = 0.0
adapter._dm_rooms = {}
from gateway.platforms.matrix import _MAX_PENDING_EVENTS
for i in range(_MAX_PENDING_EVENTS + 10):
evt = MagicMock()
evt.room_id = "!room:example.org"
evt.event_id = f"$event_{i}"
evt.sender = "@alice:example.org"
await adapter._on_encrypted_event(evt)
assert len(adapter._pending_megolm) == _MAX_PENDING_EVENTS
# ---------------------------------------------------------------------------
# E2EE: Retry pending decryptions
# ---------------------------------------------------------------------------
class TestMatrixRetryPendingDecryptions:
@pytest.mark.asyncio
async def test_successful_decryption_routes_to_handler(self):
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
adapter._startup_ts = 0.0
adapter._dm_rooms = {}
fake_encrypted = MagicMock()
fake_encrypted.event_id = "$encrypted"
decrypted_event = MagicMock()
mock_crypto = MagicMock()
mock_crypto.decrypt_megolm_event = AsyncMock(return_value=decrypted_event)
fake_client = MagicMock()
fake_client.crypto = mock_crypto
adapter._client = fake_client
now = time.time()
adapter._pending_megolm = [("!room:ex.org", fake_encrypted, now)]
with patch.object(adapter, "_on_room_message", AsyncMock()) as mock_handler:
await adapter._retry_pending_decryptions()
mock_handler.assert_awaited_once_with(decrypted_event)
# Buffer should be empty now
assert len(adapter._pending_megolm) == 0
@pytest.mark.asyncio
async def test_still_undecryptable_stays_in_buffer(self):
adapter = _make_adapter()
fake_encrypted = MagicMock()
fake_encrypted.event_id = "$still_encrypted"
mock_crypto = MagicMock()
mock_crypto.decrypt_megolm_event = AsyncMock(side_effect=Exception("missing key"))
fake_client = MagicMock()
fake_client.crypto = mock_crypto
adapter._client = fake_client
now = time.time()
adapter._pending_megolm = [("!room:ex.org", fake_encrypted, now)]
await adapter._retry_pending_decryptions()
assert len(adapter._pending_megolm) == 1
@pytest.mark.asyncio
async def test_expired_events_dropped(self):
adapter = _make_adapter()
from gateway.platforms.matrix import _PENDING_EVENT_TTL
fake_event = MagicMock()
fake_event.event_id = "$old_event"
mock_crypto = MagicMock()
fake_client = MagicMock()
fake_client.crypto = mock_crypto
adapter._client = fake_client
# Timestamp well past TTL
old_ts = time.time() - _PENDING_EVENT_TTL - 60
adapter._pending_megolm = [("!room:ex.org", fake_event, old_ts)]
await adapter._retry_pending_decryptions()
# Should have been dropped
assert len(adapter._pending_megolm) == 0
import asyncio
rooms = asyncio.get_event_loop().run_until_complete(store.find_shared_rooms("@user:ex"))
assert set(rooms) == {"!room1:example.org", "!room2:example.org"}
# ---------------------------------------------------------------------------
@@ -1354,11 +1371,70 @@ class TestMatrixEncryptedEventHandler:
handler_calls = mock_client.add_event_handler.call_args_list
registered_types = [call.args[0] for call in handler_calls]
# Should have registered handlers for ROOM_MESSAGE, REACTION, INVITE, and ROOM_ENCRYPTED
assert len(handler_calls) >= 4 # At minimum these four
# Should have registered handlers for ROOM_MESSAGE, REACTION, INVITE
assert len(handler_calls) >= 3
await adapter.disconnect()
@pytest.mark.asyncio
async def test_connect_fails_on_stale_otk_conflict(self):
"""connect() must refuse E2EE when OTK upload hits 'already exists'."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.state_store = MagicMock()
mock_client.sync_store = MagicMock()
mock_client.crypto = None
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.add_event_handler = MagicMock()
mock_client.add_dispatcher = MagicMock()
mock_client.query_keys = AsyncMock(return_value={
"device_keys": {"@bot:example.org": {"DEV123": {
"keys": {"ed25519:DEV123": "fake_ed25519_key"},
}}},
})
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
# share_keys succeeds on first call (from _verify_device_keys_on_server),
# then raises "already exists" on the proactive OTK flush in connect().
mock_olm = MagicMock()
mock_olm.load = AsyncMock()
mock_olm.share_keys = AsyncMock(
side_effect=[None, Exception("One time key signed_curve25519:AAAAAQ already exists")]
)
mock_olm.share_keys_min_trust = None
mock_olm.send_keys_min_trust = None
mock_olm.account = MagicMock()
mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"}
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
with patch.dict("sys.modules", fake_mautrix_mods):
result = await adapter.connect()
assert result is False
# ---------------------------------------------------------------------------
# Disconnect

View File

@@ -10,7 +10,6 @@ import pytest
from gateway.config import PlatformConfig
# The matrix adapter module is importable without mautrix installed
# (module-level imports use try/except with stubs). No need for
# module-level mock installation — tests that call adapter methods
@@ -159,9 +158,15 @@ class TestStripMention:
result = self.adapter._strip_mention("@hermes:example.org help me")
assert result == "help me"
def test_strip_localpart(self):
def test_localpart_preserved(self):
"""Localpart-only text is no longer stripped — avoids false positives in paths."""
result = self.adapter._strip_mention("hermes help me")
assert result == "help me"
assert result == "hermes help me"
def test_localpart_in_path_preserved(self):
"""Localpart inside a file path must not be damaged."""
result = self.adapter._strip_mention("read /home/hermes/config.yaml")
assert result == "read /home/hermes/config.yaml"
def test_strip_returns_empty_for_mention_only(self):
result = self.adapter._strip_mention("@hermes:example.org")
@@ -273,8 +278,8 @@ async def test_require_mention_dm_always_responds(monkeypatch):
@pytest.mark.asyncio
async def test_dm_strips_mention(monkeypatch):
"""DMs strip mention from body, matching Discord behavior."""
async def test_dm_strips_full_mxid(monkeypatch):
"""DMs strip the full MXID from body when require_mention is on (default)."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
@@ -289,6 +294,23 @@ async def test_dm_strips_mention(monkeypatch):
assert msg.text == "help me"
@pytest.mark.asyncio
async def test_dm_preserves_localpart_in_body(monkeypatch):
"""DMs no longer strip bare localpart — only the full MXID is removed."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
_set_dm(adapter)
event = _make_event("hermes help me")
await adapter._on_room_message(event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.text == "hermes help me"
@pytest.mark.asyncio
async def test_bare_mention_passes_empty_string(monkeypatch):
"""A message that is only a mention should pass through as empty, not be dropped."""
@@ -309,7 +331,9 @@ async def test_bare_mention_passes_empty_string(monkeypatch):
async def test_require_mention_free_response_room(monkeypatch):
"""Free-response rooms bypass mention requirement."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.setenv("MATRIX_FREE_RESPONSE_ROOMS", "!room1:example.org,!room2:example.org")
monkeypatch.setenv(
"MATRIX_FREE_RESPONSE_ROOMS", "!room1:example.org,!room2:example.org"
)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
@@ -351,6 +375,22 @@ async def test_require_mention_disabled(monkeypatch):
assert msg.text == "hello without mention"
@pytest.mark.asyncio
async def test_require_mention_disabled_skips_stripping(monkeypatch):
"""MATRIX_REQUIRE_MENTION=false: mention text is NOT stripped from body."""
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
event = _make_event("@hermes:example.org help me")
await adapter._on_room_message(event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.text == "@hermes:example.org help me"
# ---------------------------------------------------------------------------
# Auto-thread in _on_room_message
# ---------------------------------------------------------------------------
@@ -442,8 +482,10 @@ class TestThreadPersistence:
def test_empty_state_file(self, tmp_path, monkeypatch):
"""No state file → empty set."""
from gateway.platforms.helpers import ThreadParticipationTracker
monkeypatch.setattr(
ThreadParticipationTracker, "_state_path",
ThreadParticipationTracker,
"_state_path",
lambda self: tmp_path / "matrix_threads.json",
)
adapter = _make_adapter()
@@ -452,9 +494,11 @@ class TestThreadPersistence:
def test_track_thread_persists(self, tmp_path, monkeypatch):
"""mark() writes to disk."""
from gateway.platforms.helpers import ThreadParticipationTracker
state_path = tmp_path / "matrix_threads.json"
monkeypatch.setattr(
ThreadParticipationTracker, "_state_path",
ThreadParticipationTracker,
"_state_path",
lambda self: state_path,
)
adapter = _make_adapter()
@@ -466,10 +510,12 @@ class TestThreadPersistence:
def test_threads_survive_reload(self, tmp_path, monkeypatch):
"""Persisted threads are loaded by a new adapter instance."""
from gateway.platforms.helpers import ThreadParticipationTracker
state_path = tmp_path / "matrix_threads.json"
state_path.write_text(json.dumps(["$t1", "$t2"]))
monkeypatch.setattr(
ThreadParticipationTracker, "_state_path",
ThreadParticipationTracker,
"_state_path",
lambda self: state_path,
)
adapter = _make_adapter()
@@ -479,9 +525,11 @@ class TestThreadPersistence:
def test_cap_max_tracked_threads(self, tmp_path, monkeypatch):
"""Thread set is trimmed to max_tracked."""
from gateway.platforms.helpers import ThreadParticipationTracker
state_path = tmp_path / "matrix_threads.json"
monkeypatch.setattr(
ThreadParticipationTracker, "_state_path",
ThreadParticipationTracker,
"_state_path",
lambda self: state_path,
)
adapter = _make_adapter()
@@ -604,6 +652,7 @@ class TestMatrixConfigBridge:
}
import os
import yaml
config_file = tmp_path / "config.yaml"
@@ -613,18 +662,27 @@ class TestMatrixConfigBridge:
yaml_cfg = yaml.safe_load(config_file.read_text())
matrix_cfg = yaml_cfg.get("matrix", {})
if isinstance(matrix_cfg, dict):
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower())
if "require_mention" in matrix_cfg and not os.getenv(
"MATRIX_REQUIRE_MENTION"
):
monkeypatch.setenv(
"MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower()
)
frc = matrix_cfg.get("free_response_rooms")
if frc is not None and not os.getenv("MATRIX_FREE_RESPONSE_ROOMS"):
if isinstance(frc, list):
frc = ",".join(str(v) for v in frc)
monkeypatch.setenv("MATRIX_FREE_RESPONSE_ROOMS", str(frc))
if "auto_thread" in matrix_cfg and not os.getenv("MATRIX_AUTO_THREAD"):
monkeypatch.setenv("MATRIX_AUTO_THREAD", str(matrix_cfg["auto_thread"]).lower())
monkeypatch.setenv(
"MATRIX_AUTO_THREAD", str(matrix_cfg["auto_thread"]).lower()
)
assert os.getenv("MATRIX_REQUIRE_MENTION") == "false"
assert os.getenv("MATRIX_FREE_RESPONSE_ROOMS") == "!room1:example.org,!room2:example.org"
assert (
os.getenv("MATRIX_FREE_RESPONSE_ROOMS")
== "!room1:example.org,!room2:example.org"
)
assert os.getenv("MATRIX_AUTO_THREAD") == "false"
def test_yaml_bridge_sets_dm_mention_threads(self, monkeypatch, tmp_path):
@@ -632,6 +690,7 @@ class TestMatrixConfigBridge:
monkeypatch.delenv("MATRIX_DM_MENTION_THREADS", raising=False)
import os
import yaml
yaml_content = {"matrix": {"dm_mention_threads": True}}
@@ -641,8 +700,13 @@ class TestMatrixConfigBridge:
yaml_cfg = yaml.safe_load(config_file.read_text())
matrix_cfg = yaml_cfg.get("matrix", {})
if isinstance(matrix_cfg, dict):
if "dm_mention_threads" in matrix_cfg and not os.getenv("MATRIX_DM_MENTION_THREADS"):
monkeypatch.setenv("MATRIX_DM_MENTION_THREADS", str(matrix_cfg["dm_mention_threads"]).lower())
if "dm_mention_threads" in matrix_cfg and not os.getenv(
"MATRIX_DM_MENTION_THREADS"
):
monkeypatch.setenv(
"MATRIX_DM_MENTION_THREADS",
str(matrix_cfg["dm_mention_threads"]).lower(),
)
assert os.getenv("MATRIX_DM_MENTION_THREADS") == "true"
@@ -651,9 +715,12 @@ class TestMatrixConfigBridge:
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "true")
import os
yaml_cfg = {"matrix": {"require_mention": False}}
matrix_cfg = yaml_cfg.get("matrix", {})
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower())
monkeypatch.setenv(
"MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower()
)
assert os.getenv("MATRIX_REQUIRE_MENTION") == "true"

View File

@@ -1013,3 +1013,106 @@ class TestFilterAndAccumulateIntegration:
await task
except asyncio.CancelledError:
pass
# ── buffer_only mode tests ─────────────────────────────────────────────
class TestBufferOnlyMode:
"""Verify buffer_only mode suppresses intermediate edits and only
flushes on structural boundaries (done, segment break, commentary)."""
@pytest.mark.asyncio
async def test_suppresses_intermediate_edits(self):
"""Time-based and size-based edits are skipped; only got_done flushes."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True)
consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
for word in ["Hello", " world", ", this", " is", " a", " test"]:
consumer.on_delta(word)
consumer.finish()
await consumer.run()
adapter.send.assert_called_once()
adapter.edit_message.assert_not_called()
assert "Hello world, this is a test" in adapter.send.call_args_list[0][1]["content"]
@pytest.mark.asyncio
async def test_flushes_on_segment_break(self):
"""A segment break (tool call boundary) flushes accumulated text."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(side_effect=[
SimpleNamespace(success=True, message_id="msg1"),
SimpleNamespace(success=True, message_id="msg2"),
])
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True)
consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
consumer.on_delta("Before tool call")
consumer.on_delta(None)
consumer.on_delta("After tool call")
consumer.finish()
await consumer.run()
assert adapter.send.call_count == 2
assert "Before tool call" in adapter.send.call_args_list[0][1]["content"]
assert "After tool call" in adapter.send.call_args_list[1][1]["content"]
adapter.edit_message.assert_not_called()
@pytest.mark.asyncio
async def test_flushes_on_commentary(self):
"""An interim commentary message flushes in buffer_only mode."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(side_effect=[
SimpleNamespace(success=True, message_id="msg1"),
SimpleNamespace(success=True, message_id="msg2"),
SimpleNamespace(success=True, message_id="msg3"),
])
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True)
consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
consumer.on_delta("Working on it...")
consumer.on_commentary("I'll search for that first.")
consumer.on_delta("Here are the results.")
consumer.finish()
await consumer.run()
# Three sends: accumulated text, commentary, final text
assert adapter.send.call_count >= 2
adapter.edit_message.assert_not_called()
@pytest.mark.asyncio
async def test_default_mode_still_triggers_intermediate_edits(self):
"""Regression: buffer_only=False (default) still does progressive edits."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
# buffer_threshold=5 means any 5+ chars triggers an early edit
cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="")
consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
consumer.on_delta("Hello world, this is long enough to trigger edits")
consumer.finish()
await consumer.run()
# Should have at least one send. With buffer_threshold=5 and this much
# text, the consumer may send then edit, or just send once at got_done.
# The key assertion: this doesn't break.
assert adapter.send.call_count >= 1

View File

@@ -284,8 +284,40 @@ MATRIX_RECOVERY_KEY=EsT... your recovery key here
On each startup, if `MATRIX_RECOVERY_KEY` is set, Hermes imports cross-signing keys from the homeserver's secure secret storage and signs the current device. This is idempotent and safe to leave enabled permanently.
:::warning
If you delete the `~/.hermes/platforms/matrix/store/` directory, the bot loses its encryption keys. You'll need to verify the device again in your Matrix client. Back up this directory if you want to preserve encrypted sessions.
:::warning[Deleting the crypto store]
If you delete `~/.hermes/platforms/matrix/store/crypto.db`, the bot loses its encryption identity. Simply restarting with the same device ID will **not** fully recover — the homeserver still holds one-time keys signed with the old identity key, and peers cannot establish new Olm sessions.
Hermes detects this condition on startup and refuses to enable E2EE, logging: `device XXXX has stale one-time keys on the server signed with a previous identity key`.
**Easiest recovery: generate a new access token** (which gets a fresh device ID with no stale key history). See the "Upgrading from a previous version with E2EE" section below. This is the most reliable path and avoids touching the homeserver database.
**Manual recovery** (advanced — keeps the same device ID):
1. Stop Synapse and delete the old device from its database:
```bash
sudo systemctl stop matrix-synapse
sudo sqlite3 /var/lib/matrix-synapse/homeserver.db "
DELETE FROM e2e_device_keys_json WHERE device_id = 'DEVICE_ID' AND user_id = '@hermes:your-server';
DELETE FROM e2e_one_time_keys_json WHERE device_id = 'DEVICE_ID' AND user_id = '@hermes:your-server';
DELETE FROM e2e_fallback_keys_json WHERE device_id = 'DEVICE_ID' AND user_id = '@hermes:your-server';
DELETE FROM devices WHERE device_id = 'DEVICE_ID' AND user_id = '@hermes:your-server';
"
sudo systemctl start matrix-synapse
```
Or via the Synapse admin API (note the URL-encoded user ID):
```bash
curl -X DELETE -H "Authorization: Bearer ADMIN_TOKEN" \
'https://your-server/_synapse/admin/v2/users/%40hermes%3Ayour-server/devices/DEVICE_ID'
```
Note: deleting a device via the admin API may also invalidate the associated access token. You may need to generate a new token afterward.
2. Delete the local crypto store and restart Hermes:
```bash
rm -f ~/.hermes/platforms/matrix/store/crypto.db*
# restart hermes
```
Other Matrix clients (Element, matrix-commander) may cache the old device keys. After recovery, type `/discardsession` in Element to force a new encryption session with the bot.
:::
:::info
@@ -361,6 +393,10 @@ pip install 'hermes-agent[matrix]'
### Upgrading from a previous version with E2EE
:::tip
If you also manually deleted `crypto.db`, see the "Deleting the crypto store" warning in the E2EE section above — there are additional steps to clear stale one-time keys from the homeserver.
:::
If you previously used Hermes with `MATRIX_ENCRYPTION=true` and are upgrading to
a version that uses the new SQLite-based crypto store, the bot's encryption
identity has changed. Your Matrix client (Element) may cache the old device keys