From 89c812d1d2839e7fd4b3901c63331b488644e471 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 5 Apr 2026 19:46:58 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20shared=20thread=20sessions=20by=20defau?= =?UTF-8?q?lt=20=E2=80=94=20multi-user=20thread=20support=20(#5391)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Threads (Telegram forum topics, Discord threads, Slack threads) now default to shared sessions where all participants see the same conversation. This is the expected UX for threaded conversations where multiple users @mention the bot and interact collaboratively. Changes: - build_session_key(): when thread_id is present, user_id is no longer appended to the session key (threads are shared by default) - New config: thread_sessions_per_user (default: false) — opt-in to restore per-user isolation in threads if needed - Sender attribution: messages in shared threads are prefixed with [sender name] so the agent can tell participants apart - System prompt: shared threads show 'Multi-user thread' note instead of a per-turn User line (avoids busting prompt cache) - Wired through all callers: gateway/run.py, base.py, telegram.py, feishu.py - Regular group messages (no thread) remain per-user isolated (unchanged) - DM threads are unaffected (they have their own keying logic) Closes community request from demontut_ re: thread-based shared sessions. --- gateway/config.py | 7 ++ gateway/platforms/base.py | 1 + gateway/platforms/feishu.py | 2 + gateway/platforms/telegram.py | 2 + gateway/run.py | 22 ++++++ gateway/session.py | 41 ++++++++-- tests/gateway/test_config.py | 26 +++++++ tests/gateway/test_session.py | 139 +++++++++++++++++++++++++++++++++- 8 files changed, 233 insertions(+), 7 deletions(-) diff --git a/gateway/config.py b/gateway/config.py index fec050b92d..0ff3127ce1 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -246,6 +246,7 @@ class GatewayConfig: # Session isolation in shared chats group_sessions_per_user: bool = True # Isolate group/channel sessions per participant when user IDs are available + thread_sessions_per_user: bool = False # When False (default), threads are shared across all participants # Unauthorized DM policy unauthorized_dm_behavior: str = "pair" # "pair" or "ignore" @@ -333,6 +334,7 @@ class GatewayConfig: "always_log_local": self.always_log_local, "stt_enabled": self.stt_enabled, "group_sessions_per_user": self.group_sessions_per_user, + "thread_sessions_per_user": self.thread_sessions_per_user, "unauthorized_dm_behavior": self.unauthorized_dm_behavior, "streaming": self.streaming.to_dict(), } @@ -376,6 +378,7 @@ class GatewayConfig: stt_enabled = data.get("stt", {}).get("enabled") if isinstance(data.get("stt"), dict) else None group_sessions_per_user = data.get("group_sessions_per_user") + thread_sessions_per_user = data.get("thread_sessions_per_user") unauthorized_dm_behavior = _normalize_unauthorized_dm_behavior( data.get("unauthorized_dm_behavior"), "pair", @@ -392,6 +395,7 @@ class GatewayConfig: always_log_local=data.get("always_log_local", True), stt_enabled=_coerce_bool(stt_enabled, True), group_sessions_per_user=_coerce_bool(group_sessions_per_user, True), + thread_sessions_per_user=_coerce_bool(thread_sessions_per_user, False), unauthorized_dm_behavior=unauthorized_dm_behavior, streaming=StreamingConfig.from_dict(data.get("streaming", {})), ) @@ -467,6 +471,9 @@ def load_gateway_config() -> GatewayConfig: if "group_sessions_per_user" in yaml_cfg: gw_data["group_sessions_per_user"] = yaml_cfg["group_sessions_per_user"] + if "thread_sessions_per_user" in yaml_cfg: + gw_data["thread_sessions_per_user"] = yaml_cfg["thread_sessions_per_user"] + streaming_cfg = yaml_cfg.get("streaming") if isinstance(streaming_cfg, dict): gw_data["streaming"] = streaming_cfg diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 98ea4a6b63..5261aceea5 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1038,6 +1038,7 @@ class BasePlatformAdapter(ABC): session_key = build_session_key( event.source, group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), ) # Check if there's already an active handler for this session diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index d9aaae9a74..bee8b01d8a 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -1887,6 +1887,7 @@ class FeishuAdapter(BasePlatformAdapter): session_key = build_session_key( event.source, group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), ) return f"{session_key}:media:{event.message_type.value}" @@ -2163,6 +2164,7 @@ class FeishuAdapter(BasePlatformAdapter): return build_session_key( event.source, group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), ) @staticmethod diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 524324c8d6..b463870365 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -1711,6 +1711,7 @@ class TelegramAdapter(BasePlatformAdapter): return build_session_key( event.source, group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), ) def _enqueue_text_event(self, event: MessageEvent) -> None: @@ -1769,6 +1770,7 @@ class TelegramAdapter(BasePlatformAdapter): session_key = build_session_key( event.source, group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True), + thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False), ) media_group_id = getattr(msg, "media_group_id", None) if media_group_id: diff --git a/gateway/run.py b/gateway/run.py index 19eecaec46..ee1de5174b 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -770,6 +770,7 @@ class GatewayRunner: return build_session_key( source, group_sessions_per_user=getattr(config, "group_sessions_per_user", True), + thread_sessions_per_user=getattr(config, "thread_sessions_per_user", False), ) def _resolve_turn_agent_config(self, user_message: str, model: str, runtime_kwargs: dict) -> dict: @@ -1498,6 +1499,10 @@ class GatewayRunner: "group_sessions_per_user", self.config.group_sessions_per_user, ) + config.extra.setdefault( + "thread_sessions_per_user", + getattr(self.config, "thread_sessions_per_user", False), + ) if platform == Platform.TELEGRAM: from gateway.platforms.telegram import TelegramAdapter, check_telegram_requirements @@ -2662,6 +2667,23 @@ class GatewayRunner: # tool even when they appear in the same message. # ----------------------------------------------------------------- message_text = event.text or "" + + # ----------------------------------------------------------------- + # Sender attribution for shared thread sessions. + # + # When multiple users share a single thread session (the default for + # threads), prefix each message with [sender name] so the agent can + # tell participants apart. Skip for DMs (single-user by nature) and + # when per-user thread isolation is explicitly enabled. + # ----------------------------------------------------------------- + _is_shared_thread = ( + source.chat_type != "dm" + and source.thread_id + and not getattr(self.config, "thread_sessions_per_user", False) + ) + if _is_shared_thread and source.user_name: + message_text = f"[{source.user_name}] {message_text}" + if event.media_urls: image_paths = [] for i, path in enumerate(event.media_urls): diff --git a/gateway/session.py b/gateway/session.py index c3b913ef81..64f04ad9c9 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -254,8 +254,22 @@ def build_session_context_prompt( if context.source.chat_topic: lines.append(f"**Channel Topic:** {context.source.chat_topic}") - # User identity (especially useful for WhatsApp where multiple people DM) - if context.source.user_name: + # User identity. + # In shared thread sessions (non-DM with thread_id), multiple users + # contribute to the same conversation. Don't pin a single user name + # in the system prompt — it changes per-turn and would bust the prompt + # cache. Instead, note that this is a multi-user thread; individual + # sender names are prefixed on each user message by the gateway. + _is_shared_thread = ( + context.source.chat_type != "dm" + and context.source.thread_id + ) + if _is_shared_thread: + lines.append( + "**Session type:** Multi-user thread — messages are prefixed " + "with [sender name]. Multiple users may participate." + ) + elif context.source.user_name: lines.append(f"**User:** {context.source.user_name}") elif context.source.user_id: uid = context.source.user_id @@ -427,7 +441,11 @@ class SessionEntry: ) -def build_session_key(source: SessionSource, group_sessions_per_user: bool = True) -> str: +def build_session_key( + source: SessionSource, + group_sessions_per_user: bool = True, + thread_sessions_per_user: bool = False, +) -> str: """Build a deterministic session key from a message source. This is the single source of truth for session key construction. @@ -442,7 +460,11 @@ def build_session_key(source: SessionSource, group_sessions_per_user: bool = Tru - chat_id identifies the parent group/channel. - user_id/user_id_alt isolates participants within that parent chat when available when ``group_sessions_per_user`` is enabled. - - thread_id differentiates threads within that parent chat. + - thread_id differentiates threads within that parent chat. When + ``thread_sessions_per_user`` is False (default), threads are *shared* across all + participants — user_id is NOT appended, so every user in the thread + shares a single session. This is the expected UX for threaded + conversations (Telegram forum topics, Discord threads, Slack threads). - Without participant identifiers, or when isolation is disabled, messages fall back to one shared session per chat. - Without identifiers, messages fall back to one session per platform/chat_type. @@ -464,7 +486,15 @@ def build_session_key(source: SessionSource, group_sessions_per_user: bool = Tru key_parts.append(source.chat_id) if source.thread_id: key_parts.append(source.thread_id) - if group_sessions_per_user and participant_id: + + # In threads, default to shared sessions (all participants see the same + # conversation). Per-user isolation only applies when explicitly enabled + # via thread_sessions_per_user, or when there is no thread (regular group). + isolate_user = group_sessions_per_user + if source.thread_id and not thread_sessions_per_user: + isolate_user = False + + if isolate_user and participant_id: key_parts.append(str(participant_id)) return ":".join(key_parts) @@ -552,6 +582,7 @@ class SessionStore: return build_session_key( source, group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True), + thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False), ) def _is_session_expired(self, entry: SessionEntry) -> bool: diff --git a/tests/gateway/test_config.py b/tests/gateway/test_config.py index 8f24faa995..c08e263dd0 100644 --- a/tests/gateway/test_config.py +++ b/tests/gateway/test_config.py @@ -109,6 +109,7 @@ class TestGatewayConfigRoundtrip: reset_triggers=["/new"], quick_commands={"limits": {"type": "exec", "command": "echo ok"}}, group_sessions_per_user=False, + thread_sessions_per_user=True, ) d = config.to_dict() restored = GatewayConfig.from_dict(d) @@ -118,6 +119,7 @@ class TestGatewayConfigRoundtrip: assert restored.reset_triggers == ["/new"] assert restored.quick_commands == {"limits": {"type": "exec", "command": "echo ok"}} assert restored.group_sessions_per_user is False + assert restored.thread_sessions_per_user is True def test_roundtrip_preserves_unauthorized_dm_behavior(self): config = GatewayConfig( @@ -167,6 +169,30 @@ class TestLoadGatewayConfig: assert config.group_sessions_per_user is False + def test_bridges_thread_sessions_per_user_from_config_yaml(self, tmp_path, monkeypatch): + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + config_path = hermes_home / "config.yaml" + config_path.write_text("thread_sessions_per_user: true\n", encoding="utf-8") + + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + config = load_gateway_config() + + assert config.thread_sessions_per_user is True + + def test_thread_sessions_per_user_defaults_to_false(self, tmp_path, monkeypatch): + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + config_path = hermes_home / "config.yaml" + config_path.write_text("{}\n", encoding="utf-8") + + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + config = load_gateway_config() + + assert config.thread_sessions_per_user is False + def test_invalid_quick_commands_in_config_yaml_are_ignored(self, tmp_path, monkeypatch): hermes_home = tmp_path / ".hermes" hermes_home.mkdir() diff --git a/tests/gateway/test_session.py b/tests/gateway/test_session.py index 77d4993ee3..d1acbda016 100644 --- a/tests/gateway/test_session.py +++ b/tests/gateway/test_session.py @@ -291,6 +291,69 @@ class TestBuildSessionContextPrompt: assert "WhatsApp" in prompt or "whatsapp" in prompt.lower() + def test_multi_user_thread_prompt(self): + """Shared thread sessions show multi-user note instead of single user.""" + config = GatewayConfig( + platforms={ + Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake"), + }, + ) + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1002285219667", + chat_name="Test Group", + chat_type="group", + thread_id="17585", + user_name="Alice", + ) + ctx = build_session_context(source, config) + prompt = build_session_context_prompt(ctx) + + assert "Multi-user thread" in prompt + assert "[sender name]" in prompt + # Should NOT show a specific **User:** line (would bust cache) + assert "**User:** Alice" not in prompt + + def test_non_thread_group_shows_user(self): + """Regular group messages (no thread) still show the user name.""" + config = GatewayConfig( + platforms={ + Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake"), + }, + ) + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1002285219667", + chat_name="Test Group", + chat_type="group", + user_name="Alice", + ) + ctx = build_session_context(source, config) + prompt = build_session_context_prompt(ctx) + + assert "**User:** Alice" in prompt + assert "Multi-user thread" not in prompt + + def test_dm_thread_shows_user_not_multi(self): + """DM threads are single-user and should show User, not multi-user note.""" + config = GatewayConfig( + platforms={ + Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake"), + }, + ) + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="99", + chat_type="dm", + thread_id="topic-1", + user_name="Alice", + ) + ctx = build_session_context(source, config) + prompt = build_session_context_prompt(ctx) + + assert "**User:** Alice" in prompt + assert "Multi-user thread" not in prompt + class TestSessionStoreRewriteTranscript: """Regression: /retry and /undo must persist truncated history to disk.""" @@ -636,7 +699,28 @@ class TestWhatsAppDMSessionKeyConsistency: key = build_session_key(source) assert key == "agent:main:telegram:group:-1002285219667:17585" - def test_group_thread_sessions_are_isolated_per_user(self): + def test_group_thread_sessions_are_shared_by_default(self): + """Threads default to shared sessions — user_id is NOT appended.""" + alice = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1002285219667", + chat_type="group", + thread_id="17585", + user_id="alice", + ) + bob = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1002285219667", + chat_type="group", + thread_id="17585", + user_id="bob", + ) + assert build_session_key(alice) == "agent:main:telegram:group:-1002285219667:17585" + assert build_session_key(bob) == "agent:main:telegram:group:-1002285219667:17585" + assert build_session_key(alice) == build_session_key(bob) + + def test_group_thread_sessions_can_be_isolated_per_user(self): + """thread_sessions_per_user=True restores per-user isolation in threads.""" source = SessionSource( platform=Platform.TELEGRAM, chat_id="-1002285219667", @@ -644,9 +728,60 @@ class TestWhatsAppDMSessionKeyConsistency: thread_id="17585", user_id="42", ) - key = build_session_key(source) + key = build_session_key(source, thread_sessions_per_user=True) assert key == "agent:main:telegram:group:-1002285219667:17585:42" + def test_non_thread_group_sessions_still_isolated_per_user(self): + """Regular group messages (no thread_id) remain per-user by default.""" + alice = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1002285219667", + chat_type="group", + user_id="alice", + ) + bob = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1002285219667", + chat_type="group", + user_id="bob", + ) + assert build_session_key(alice) == "agent:main:telegram:group:-1002285219667:alice" + assert build_session_key(bob) == "agent:main:telegram:group:-1002285219667:bob" + assert build_session_key(alice) != build_session_key(bob) + + def test_discord_thread_sessions_shared_by_default(self): + """Discord threads are shared across participants by default.""" + alice = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="thread", + thread_id="thread-456", + user_id="alice", + ) + bob = SessionSource( + platform=Platform.DISCORD, + chat_id="guild-123", + chat_type="thread", + thread_id="thread-456", + user_id="bob", + ) + assert build_session_key(alice) == build_session_key(bob) + assert "alice" not in build_session_key(alice) + assert "bob" not in build_session_key(bob) + + def test_dm_thread_sessions_not_affected(self): + """DM threads use their own keying logic and are not affected.""" + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="99", + chat_type="dm", + thread_id="topic-1", + user_id="42", + ) + key = build_session_key(source) + # DM logic: chat_id + thread_id, user_id never included + assert key == "agent:main:telegram:dm:99:topic-1" + class TestSessionStoreEntriesAttribute: """Regression: /reset must access _entries, not _sessions."""