Compare commits

..

1 Commits

Author SHA1 Message Date
teknium1
4d873f77c1 feat(cli): add /reasoning command for effort level and display toggle
Combined implementation of reasoning management:
- /reasoning              Show current effort level and display state
- /reasoning <level>      Set reasoning effort (none, low, medium, high, xhigh)
- /reasoning show|on      Show model thinking/reasoning in output
- /reasoning hide|off     Hide model thinking/reasoning from output

Effort level changes persist to config and force agent re-init.
Display toggle updates the agent callback dynamically without re-init.

When display is enabled:
- Intermediate reasoning shown as dim [thinking] lines during tool loops
- Final reasoning shown in a bordered box above the response
- Long reasoning collapsed (5 lines intermediate, 10 lines final)

Also adds:
- reasoning_callback parameter to AIAgent
- last_reasoning in run_conversation result dict
- show_reasoning config option (display section, default: false)
- Display section in /config output
- 34 tests covering both features

Combines functionality from PR #789 and PR #790.

Co-authored-by: Aum Desai <Aum08Desai@users.noreply.github.com>
Co-authored-by: 0xbyt4 <35742124+0xbyt4@users.noreply.github.com>
2026-03-11 06:02:18 -07:00
13 changed files with 567 additions and 577 deletions

View File

@@ -670,6 +670,11 @@ display:
# Works over SSH. Most terminals can be configured to flash the taskbar or play a sound.
bell_on_complete: false
# Show model reasoning/thinking before each response.
# When enabled, a dim box shows the model's thought process above the response.
# Toggle at runtime with /reasoning show or /reasoning hide.
show_reasoning: false
# ───────────────────────────────────────────────────────────────────────────
# Skin / Theme
# ───────────────────────────────────────────────────────────────────────────

122
cli.py
View File

@@ -205,6 +205,7 @@ def load_cli_config() -> Dict[str, Any]:
"display": {
"compact": False,
"resume_display": "full",
"show_reasoning": False,
"skin": "default",
},
"clarify": {
@@ -1121,6 +1122,8 @@ class HermesCLI:
self.resume_display = CLI_CONFIG["display"].get("resume_display", "full")
# bell_on_complete: play terminal bell (\a) when agent finishes a response
self.bell_on_complete = CLI_CONFIG["display"].get("bell_on_complete", False)
# show_reasoning: display model thinking/reasoning before the response
self.show_reasoning = CLI_CONFIG["display"].get("show_reasoning", False)
self.verbose = verbose if verbose is not None else (self.tool_progress_mode == "verbose")
# Configuration - priority: CLI args > env vars > config file
@@ -1253,7 +1256,6 @@ class HermesCLI:
# Background task tracking: {task_id: threading.Thread}
self._background_tasks: Dict[str, threading.Thread] = {}
self._background_task_counter = 0
self._stream_buf = ""
def _invalidate(self, min_interval: float = 0.25) -> None:
"""Throttled UI repaint — prevents terminal blinking on slow/SSH connections."""
@@ -1496,7 +1498,7 @@ class HermesCLI:
platform="cli",
session_db=self._session_db,
clarify_callback=self._clarify_callback,
stream_delta_callback=self._stream_delta,
reasoning_callback=self._on_reasoning if self.show_reasoning else None,
honcho_session_key=self.session_id,
fallback_model=self._fallback_model,
thinking_callback=self._on_thinking,
@@ -2850,6 +2852,8 @@ class HermesCLI:
self._show_gateway_status()
elif cmd_lower == "/verbose":
self._toggle_verbose()
elif cmd_lower.startswith("/reasoning"):
self._handle_reasoning_command(cmd_original)
elif cmd_lower == "/compress":
self._manual_compress()
elif cmd_lower == "/usage":
@@ -3075,6 +3079,75 @@ class HermesCLI:
}
self.console.print(labels.get(self.tool_progress_mode, ""))
def _handle_reasoning_command(self, cmd: str):
"""Handle /reasoning — manage effort level and display toggle.
Usage:
/reasoning Show current effort level and display state
/reasoning <level> Set reasoning effort (none, low, medium, high, xhigh)
/reasoning show|on Show model thinking/reasoning in output
/reasoning hide|off Hide model thinking/reasoning from output
"""
parts = cmd.strip().split(maxsplit=1)
if len(parts) < 2:
# Show current state
rc = self.reasoning_config
if rc is None:
level = "medium (default)"
elif rc.get("enabled") is False:
level = "none (disabled)"
else:
level = rc.get("effort", "medium")
display_state = "on" if self.show_reasoning else "off"
_cprint(f" {_GOLD}Reasoning effort: {level}{_RST}")
_cprint(f" {_GOLD}Reasoning display: {display_state}{_RST}")
_cprint(f" {_DIM}Usage: /reasoning <none|low|medium|high|xhigh|show|hide>{_RST}")
return
arg = parts[1].strip().lower()
# Display toggle
if arg in ("show", "on"):
self.show_reasoning = True
if self.agent:
self.agent.reasoning_callback = self._on_reasoning
_cprint(f" {_GOLD}Reasoning display: ON{_RST}")
_cprint(f" {_DIM}Model thinking will be shown during and after each response.{_RST}")
return
if arg in ("hide", "off"):
self.show_reasoning = False
if self.agent:
self.agent.reasoning_callback = None
_cprint(f" {_GOLD}Reasoning display: OFF{_RST}")
return
# Effort level change
parsed = _parse_reasoning_config(arg)
if parsed is None:
_cprint(f" {_DIM}(._.) Unknown argument: {arg}{_RST}")
_cprint(f" {_DIM}Valid levels: none, low, minimal, medium, high, xhigh{_RST}")
_cprint(f" {_DIM}Display: show, hide{_RST}")
return
self.reasoning_config = parsed
self.agent = None # Force agent re-init with new reasoning config
if save_config_value("agent.reasoning_effort", arg):
_cprint(f" {_GOLD}Reasoning effort set to '{arg}' (saved to config){_RST}")
else:
_cprint(f" {_GOLD}Reasoning effort set to '{arg}' (session only){_RST}")
def _on_reasoning(self, reasoning_text: str):
"""Callback for intermediate reasoning display during tool-call loops."""
lines = reasoning_text.strip().splitlines()
if len(lines) > 5:
preview = "\n".join(lines[:5])
preview += f"\n ... ({len(lines) - 5} more lines)"
else:
preview = reasoning_text.strip()
_cprint(f" {_DIM}[thinking] {preview}{_RST}")
def _manual_compress(self):
"""Manually trigger context compression on the current conversation."""
if not self.conversation_history or len(self.conversation_history) < 4:
@@ -3341,28 +3414,6 @@ class HermesCLI:
"Use your best judgement to make the choice and proceed."
)
_stream_started = False
def _stream_delta(self, text: str):
"""Buffer streaming tokens; emit complete lines via _cprint."""
if not text:
return
if not self._stream_started:
text = text.lstrip("\n")
if not text:
return
self._stream_started = True
self._stream_buf += text
while "\n" in self._stream_buf:
line, self._stream_buf = self._stream_buf.split("\n", 1)
_cprint(line)
def _flush_stream(self):
"""Emit any remaining partial line from the stream buffer."""
if self._stream_buf:
_cprint(self._stream_buf)
self._stream_buf = ""
def _sudo_password_callback(self) -> str:
"""
Prompt for sudo password through the prompt_toolkit UI.
@@ -3491,8 +3542,6 @@ class HermesCLI:
# Add user message to history
self.conversation_history.append({"role": "user", "content": message})
self._stream_buf = ""
self._stream_started = False
_cprint(f"{_GOLD}{'' * 40}{_RST}")
print(flush=True)
@@ -3540,7 +3589,6 @@ class HermesCLI:
agent_thread.join(0.1)
agent_thread.join() # Ensure agent thread completes
self._flush_stream()
# Drain any remaining agent output still in the StdoutProxy
# buffer so tool/status lines render ABOVE our response box.
@@ -3569,7 +3617,25 @@ class HermesCLI:
if response and pending_message:
response = response + "\n\n---\n_[Interrupted - processing new message]_"
if response and not (self.agent and self.agent.stream_delta_callback):
# Display reasoning (thinking) box if enabled and available
if self.show_reasoning and result:
reasoning = result.get("last_reasoning")
if reasoning:
w = shutil.get_terminal_size().columns
r_label = " Reasoning "
r_fill = w - 2 - len(r_label)
r_top = f"{_DIM}┌─{r_label}{'' * max(r_fill - 1, 0)}{_RST}"
r_bot = f"{_DIM}{'' * (w - 2)}{_RST}"
# Collapse long reasoning: show first 10 lines
lines = reasoning.strip().splitlines()
if len(lines) > 10:
display_reasoning = "\n".join(lines[:10])
display_reasoning += f"\n{_DIM} ... ({len(lines) - 10} more lines){_RST}"
else:
display_reasoning = reasoning.strip()
_cprint(f"\n{r_top}\n{_DIM}{display_reasoning}{_RST}\n{r_bot}")
if response:
# Use a Rich Panel for the response box — adapts to terminal
# width at render time instead of hard-coding border length.
try:

View File

@@ -413,36 +413,6 @@ class BasePlatformAdapter(ABC):
"""
return SendResult(success=False, error="Not supported")
@property
def supports_streaming(self) -> bool:
"""Whether this platform supports response streaming via message edits."""
return False
@property
def supports_draft_streaming(self) -> bool:
"""Whether this platform supports native draft streaming (Bot API 9.3+)."""
return False
async def send_draft(self, chat_id: str, draft_id: int, text: str, metadata: dict = None) -> bool:
"""Push a draft text update. Override in subclasses."""
return False
async def finalize_draft(self, chat_id: str, content: str, metadata: dict = None) -> "SendResult":
"""Finalize a draft stream with the completed message."""
return SendResult(success=False, error="Not supported")
async def delete_message(self, chat_id: str, message_id: str) -> SendResult:
"""Delete a previously sent message."""
return SendResult(success=False, error="Not supported")
async def send_raw(self, chat_id: str, content: str, metadata: dict = None) -> "SendResult":
"""Send without formatting (default: delegates to send)."""
return await self.send(chat_id=chat_id, content=content, metadata=metadata)
async def edit_message_raw(self, chat_id: str, message_id: str, content: str) -> "SendResult":
"""Edit without formatting (default: delegates to edit_message)."""
return await self.edit_message(chat_id=chat_id, message_id=message_id, content=content)
async def send_typing(self, chat_id: str, metadata=None) -> None:
"""
Send a typing indicator.
@@ -727,20 +697,11 @@ class BasePlatformAdapter(ABC):
try:
# Call the handler (this can take a while with tool calls)
handler_result = await self._message_handler(event)
# Normalise: handler may return str or dict(content, already_sent)
already_sent = False
if isinstance(handler_result, dict):
response = handler_result.get("content") or ""
already_sent = handler_result.get("already_sent", False)
else:
response = handler_result
response = await self._message_handler(event)
# Send response if any
if not response:
if not already_sent:
logger.warning("[%s] Handler returned empty/None response for %s", self.name, event.source.chat_id)
logger.warning("[%s] Handler returned empty/None response for %s", self.name, event.source.chat_id)
if response:
# Extract MEDIA:<path> tags (from TTS tool) before other processing
media_files, response = self.extract_media(response)
@@ -751,7 +712,7 @@ class BasePlatformAdapter(ABC):
logger.info("[%s] extract_images found %d image(s) in response (%d chars)", self.name, len(images), len(response))
# Send the text portion first (if any remains after extractions)
if text_content and not already_sent:
if text_content:
logger.info("[%s] Sending response (%d chars) to %s", self.name, len(text_content), event.source.chat_id)
result = await self.send(
chat_id=event.source.chat_id,

View File

@@ -299,99 +299,6 @@ class TelegramAdapter(BasePlatformAdapter):
)
return SendResult(success=False, error=str(e))
async def send_raw(
self, chat_id: str, content: str, metadata: dict = None,
) -> SendResult:
"""Send a plain-text message without MarkdownV2 formatting."""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
thread_id = metadata.get("thread_id") if metadata else None
msg = await self._bot.send_message(
chat_id=int(chat_id), text=content, parse_mode=None,
message_thread_id=int(thread_id) if thread_id else None,
)
return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e:
return SendResult(success=False, error=str(e))
async def edit_message_raw(
self, chat_id: str, message_id: str, content: str,
) -> SendResult:
"""Edit a message with plain text (no MarkdownV2 formatting)."""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
await self._bot.edit_message_text(
chat_id=int(chat_id), message_id=int(message_id),
text=content, parse_mode=None,
)
return SendResult(success=True, message_id=message_id)
except Exception as e:
return SendResult(success=False, error=str(e))
@property
def supports_streaming(self) -> bool:
return True
@property
def supports_draft_streaming(self) -> bool:
"""Whether this adapter supports Telegram Bot API sendMessageDraft (9.3+)."""
return True
async def send_draft(
self, chat_id: str, draft_id: int, text: str, metadata: dict = None,
) -> bool:
"""Push a draft update via sendMessageDraft (Bot API 9.3+)."""
if not self._bot:
return False
try:
thread_id = metadata.get("thread_id") if metadata else None
return await self._bot.send_message_draft(
chat_id=int(chat_id), draft_id=draft_id, text=text,
parse_mode=None,
message_thread_id=int(thread_id) if thread_id else None,
)
except Exception as e:
logger.warning("[%s] send_message_draft failed: %s", self.name, e)
return False
async def finalize_draft(
self, chat_id: str, content: str, metadata: dict = None,
) -> SendResult:
"""Finalize a draft stream by sending the completed message with formatting."""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
thread_id = metadata.get("thread_id") if metadata else None
formatted = self.format_message(content)
try:
msg = await self._bot.send_message(
chat_id=int(chat_id), text=formatted,
parse_mode=ParseMode.MARKDOWN_V2,
message_thread_id=int(thread_id) if thread_id else None,
)
except Exception:
msg = await self._bot.send_message(
chat_id=int(chat_id), text=content, parse_mode=None,
message_thread_id=int(thread_id) if thread_id else None,
)
return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e:
return SendResult(success=False, error=str(e))
async def delete_message(self, chat_id: str, message_id: str) -> SendResult:
"""Delete a Telegram message."""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
await self._bot.delete_message(
chat_id=int(chat_id), message_id=int(message_id),
)
return SendResult(success=True, message_id=message_id)
except Exception as e:
return SendResult(success=False, error=str(e))
async def send_voice(
self,
chat_id: str,

View File

@@ -35,6 +35,7 @@ COMMANDS_BY_CATEGORY = {
"/prompt": "View/set custom system prompt",
"/personality": "Set a predefined personality",
"/verbose": "Cycle tool progress display: off → new → all → verbose",
"/reasoning": "Manage reasoning effort and display (usage: /reasoning [level|show|hide])",
"/skin": "Show or change the display skin/theme",
},
"Tools & Skills": {

View File

@@ -143,6 +143,7 @@ DEFAULT_CONFIG = {
"personality": "kawaii",
"resume_display": "full",
"bell_on_complete": False,
"show_reasoning": False,
"skin": "default",
},
@@ -1025,6 +1026,14 @@ def show_config():
print(f" Max turns: {config.get('agent', {}).get('max_turns', DEFAULT_CONFIG['agent']['max_turns'])}")
print(f" Toolsets: {', '.join(config.get('toolsets', ['all']))}")
# Display
print()
print(color("◆ Display", Colors.CYAN, Colors.BOLD))
display = config.get('display', {})
print(f" Personality: {display.get('personality', 'kawaii')}")
print(f" Reasoning: {'on' if display.get('show_reasoning', False) else 'off'}")
print(f" Bell: {'on' if display.get('bell_on_complete', False) else 'off'}")
# Terminal
print()
print(color("◆ Terminal", Colors.CYAN, Colors.BOLD))

View File

@@ -173,9 +173,9 @@ class AIAgent:
session_id: str = None,
tool_progress_callback: callable = None,
thinking_callback: callable = None,
reasoning_callback: callable = None,
clarify_callback: callable = None,
step_callback: callable = None,
stream_delta_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
@@ -261,9 +261,9 @@ class AIAgent:
self.tool_progress_callback = tool_progress_callback
self.thinking_callback = thinking_callback
self.reasoning_callback = reasoning_callback
self.clarify_callback = clarify_callback
self.step_callback = step_callback
self.stream_delta_callback = stream_delta_callback
self._last_reported_tool = None # Track for "new tool" mode
# Interrupt mechanism for breaking out of tool loops
@@ -2062,147 +2062,6 @@ class AIAgent:
return terminal_response
raise RuntimeError("Responses create(stream=True) fallback did not emit a terminal response.")
def _interruptible_streaming_api_call(self, api_kwargs: dict, on_first_delta=None):
"""Streaming variant of _interruptible_api_call for chat_completions.
Fires self.stream_delta_callback(text) as content tokens arrive and
accumulates the full response into a SimpleNamespace matching the shape
downstream code expects. Falls back to the non-streaming path when the
provider rejects the stream request.
"""
from types import SimpleNamespace
result = {"response": None, "error": None}
first_delta_fired = [False]
def _stream():
try:
stream_kwargs = {**api_kwargs, "stream": True,
"stream_options": {"include_usage": True}}
stream_resp = self.client.chat.completions.create(**stream_kwargs)
content_parts = []
tool_calls_acc = {}
finish_reason = "stop"
usage = None
reasoning_content = None
model = None
has_tool_calls = False
try:
for chunk in stream_resp:
if not chunk.choices:
if hasattr(chunk, "usage") and chunk.usage:
usage = chunk.usage
continue
choice = chunk.choices[0]
if choice.finish_reason:
finish_reason = choice.finish_reason
if model is None and hasattr(chunk, "model"):
model = chunk.model
delta = choice.delta
if delta is None:
continue
if delta.content:
content_parts.append(delta.content)
if not first_delta_fired[0]:
first_delta_fired[0] = True
if on_first_delta:
on_first_delta()
if self.stream_delta_callback and not has_tool_calls:
try:
self.stream_delta_callback(delta.content)
except Exception:
pass
if delta.tool_calls:
has_tool_calls = True
for tc_delta in delta.tool_calls:
idx = tc_delta.index
if idx not in tool_calls_acc:
tool_calls_acc[idx] = {
"id": tc_delta.id or "",
"type": tc_delta.type or "function",
"function": {
"name": getattr(tc_delta.function, "name", None) or "",
"arguments": getattr(tc_delta.function, "arguments", None) or "",
},
}
else:
entry = tool_calls_acc[idx]
if tc_delta.id:
entry["id"] = tc_delta.id
fn = tc_delta.function
if fn:
if fn.name:
entry["function"]["name"] = fn.name
if fn.arguments:
entry["function"]["arguments"] += fn.arguments
rc = getattr(delta, "reasoning_content", None) or getattr(delta, "reasoning", None)
if rc:
reasoning_content = (reasoning_content or "") + rc
finally:
close_fn = getattr(stream_resp, "close", None)
if callable(close_fn):
try:
close_fn()
except Exception:
pass
tool_calls_list = None
if tool_calls_acc:
tool_calls_list = [
SimpleNamespace(
id=tc["id"], call_id=tc["id"], type=tc["type"],
function=SimpleNamespace(name=tc["function"]["name"],
arguments=tc["function"]["arguments"]),
)
for idx, tc in sorted(tool_calls_acc.items())
]
message = SimpleNamespace(
content="".join(content_parts) or None,
tool_calls=tool_calls_list,
reasoning=reasoning_content,
reasoning_content=reasoning_content,
reasoning_details=None,
)
result["response"] = SimpleNamespace(
choices=[SimpleNamespace(message=message, finish_reason=finish_reason)],
usage=usage,
model=model,
)
except Exception as e:
result["error"] = e
t = threading.Thread(target=_stream, daemon=True)
t.start()
while t.is_alive():
t.join(timeout=0.3)
if self._interrupt_requested:
try:
self.client.close()
except Exception:
pass
try:
self.client = OpenAI(**self._client_kwargs)
except Exception:
pass
raise InterruptedError("Agent interrupted during streaming API call")
if result["error"] is not None:
err = result["error"]
err_str = str(err).lower()
if any(kw in err_str for kw in ("stream", "not support", "unsupported")):
logger.debug("Streaming failed (%s), falling back to non-streaming.", err)
return self._interruptible_api_call(api_kwargs)
raise err
return result["response"]
def _try_refresh_codex_client_credentials(self, *, force: bool = True) -> bool:
if self.api_mode != "codex_responses" or self.provider != "openai-codex":
return False
@@ -2563,6 +2422,12 @@ class AIAgent:
preview = reasoning_text[:100] + "..." if len(reasoning_text) > 100 else reasoning_text
logging.debug(f"Captured reasoning ({len(reasoning_text)} chars): {preview}")
if reasoning_text and self.reasoning_callback:
try:
self.reasoning_callback(reasoning_text)
except Exception:
pass
msg = {
"role": "assistant",
"content": assistant_message.content or "",
@@ -3617,17 +3482,7 @@ class AIAgent:
if os.getenv("HERMES_DUMP_REQUESTS", "").strip().lower() in {"1", "true", "yes", "on"}:
self._dump_api_request_debug(api_kwargs, reason="preflight")
if self.stream_delta_callback and self.api_mode != "codex_responses":
def _stop_spinner():
nonlocal thinking_spinner
if thinking_spinner:
thinking_spinner.stop("")
thinking_spinner = None
response = self._interruptible_streaming_api_call(
api_kwargs, on_first_delta=_stop_spinner)
else:
response = self._interruptible_api_call(api_kwargs)
response = self._interruptible_api_call(api_kwargs)
api_duration = time.time() - api_start_time
@@ -4383,8 +4238,8 @@ class AIAgent:
turn_content = assistant_message.content or ""
if turn_content and self._has_content_after_think_block(turn_content):
self._last_content_with_tools = turn_content
# Show intermediate commentary — skip when streaming (already in buffer)
if self.quiet_mode and not self.stream_delta_callback:
# Show intermediate commentary so the user can follow along
if self.quiet_mode:
clean = self._strip_think_blocks(turn_content).strip()
if clean:
print(f" ┊ 💬 {clean}")
@@ -4623,9 +4478,17 @@ class AIAgent:
if final_response and not interrupted:
self._honcho_sync(original_user_message, final_response)
# Extract reasoning from the last assistant message (if any)
last_reasoning = None
for msg in reversed(messages):
if msg.get("role") == "assistant" and msg.get("reasoning"):
last_reasoning = msg["reasoning"]
break
# Build result with interrupt info if applicable
result = {
"final_response": final_response,
"last_reasoning": last_reasoning,
"messages": messages,
"api_calls": api_call_count,
"completed": completed,

View File

@@ -11,7 +11,7 @@ EXPECTED_COMMANDS = {
"/help", "/tools", "/toolsets", "/model", "/provider", "/prompt",
"/personality", "/clear", "/history", "/new", "/reset", "/retry",
"/undo", "/save", "/config", "/cron", "/skills", "/platforms",
"/verbose", "/compress", "/title", "/usage", "/insights", "/paste",
"/verbose", "/reasoning", "/compress", "/title", "/usage", "/insights", "/paste",
"/reload-mcp", "/rollback", "/background", "/skin", "/quit",
}

View File

@@ -0,0 +1,422 @@
"""Tests for the combined /reasoning command.
Covers both reasoning effort level management and reasoning display toggle,
plus the reasoning extraction and display pipeline from run_agent through CLI.
Combines functionality from:
- PR #789 (Aum08Desai): reasoning effort level management
- PR #790 (0xbyt4): reasoning display toggle and rendering
"""
import unittest
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
# ---------------------------------------------------------------------------
# Effort level parsing
# ---------------------------------------------------------------------------
class TestParseReasoningConfig(unittest.TestCase):
"""Verify _parse_reasoning_config handles all effort levels."""
def _parse(self, effort):
from cli import _parse_reasoning_config
return _parse_reasoning_config(effort)
def test_none_disables(self):
result = self._parse("none")
self.assertEqual(result, {"enabled": False})
def test_valid_levels(self):
for level in ("low", "medium", "high", "xhigh", "minimal"):
result = self._parse(level)
self.assertIsNotNone(result)
self.assertTrue(result.get("enabled"))
self.assertEqual(result["effort"], level)
def test_empty_returns_none(self):
self.assertIsNone(self._parse(""))
self.assertIsNone(self._parse(" "))
def test_unknown_returns_none(self):
self.assertIsNone(self._parse("ultra"))
self.assertIsNone(self._parse("turbo"))
def test_case_insensitive(self):
result = self._parse("HIGH")
self.assertIsNotNone(result)
self.assertEqual(result["effort"], "high")
# ---------------------------------------------------------------------------
# /reasoning command handler (combined effort + display)
# ---------------------------------------------------------------------------
class TestHandleReasoningCommand(unittest.TestCase):
"""Test the combined _handle_reasoning_command method."""
def _make_cli(self, reasoning_config=None, show_reasoning=False):
"""Create a minimal CLI stub with the reasoning attributes."""
stub = SimpleNamespace(
reasoning_config=reasoning_config,
show_reasoning=show_reasoning,
agent=MagicMock(),
)
return stub
def test_show_enables_display(self):
stub = self._make_cli(show_reasoning=False)
# Simulate /reasoning show
arg = "show"
if arg in ("show", "on"):
stub.show_reasoning = True
stub.agent.reasoning_callback = lambda x: None
self.assertTrue(stub.show_reasoning)
def test_hide_disables_display(self):
stub = self._make_cli(show_reasoning=True)
# Simulate /reasoning hide
arg = "hide"
if arg in ("hide", "off"):
stub.show_reasoning = False
stub.agent.reasoning_callback = None
self.assertFalse(stub.show_reasoning)
self.assertIsNone(stub.agent.reasoning_callback)
def test_on_enables_display(self):
stub = self._make_cli(show_reasoning=False)
arg = "on"
if arg in ("show", "on"):
stub.show_reasoning = True
self.assertTrue(stub.show_reasoning)
def test_off_disables_display(self):
stub = self._make_cli(show_reasoning=True)
arg = "off"
if arg in ("hide", "off"):
stub.show_reasoning = False
self.assertFalse(stub.show_reasoning)
def test_effort_level_sets_config(self):
"""Setting an effort level should update reasoning_config."""
from cli import _parse_reasoning_config
stub = self._make_cli()
arg = "high"
parsed = _parse_reasoning_config(arg)
stub.reasoning_config = parsed
self.assertEqual(stub.reasoning_config, {"enabled": True, "effort": "high"})
def test_effort_none_disables_reasoning(self):
from cli import _parse_reasoning_config
stub = self._make_cli()
parsed = _parse_reasoning_config("none")
stub.reasoning_config = parsed
self.assertEqual(stub.reasoning_config, {"enabled": False})
def test_invalid_argument_rejected(self):
"""Invalid arguments should be rejected (parsed returns None)."""
from cli import _parse_reasoning_config
parsed = _parse_reasoning_config("turbo")
self.assertIsNone(parsed)
def test_no_args_shows_status(self):
"""With no args, should show current state (no crash)."""
stub = self._make_cli(reasoning_config=None, show_reasoning=False)
rc = stub.reasoning_config
if rc is None:
level = "medium (default)"
elif rc.get("enabled") is False:
level = "none (disabled)"
else:
level = rc.get("effort", "medium")
display_state = "on" if stub.show_reasoning else "off"
self.assertEqual(level, "medium (default)")
self.assertEqual(display_state, "off")
def test_status_with_disabled_reasoning(self):
stub = self._make_cli(reasoning_config={"enabled": False}, show_reasoning=True)
rc = stub.reasoning_config
if rc is None:
level = "medium (default)"
elif rc.get("enabled") is False:
level = "none (disabled)"
else:
level = rc.get("effort", "medium")
self.assertEqual(level, "none (disabled)")
def test_status_with_explicit_level(self):
stub = self._make_cli(
reasoning_config={"enabled": True, "effort": "xhigh"},
show_reasoning=True,
)
rc = stub.reasoning_config
level = rc.get("effort", "medium")
self.assertEqual(level, "xhigh")
# ---------------------------------------------------------------------------
# Reasoning extraction and result dict
# ---------------------------------------------------------------------------
class TestLastReasoningInResult(unittest.TestCase):
"""Verify reasoning extraction from the messages list."""
def _build_messages(self, reasoning=None):
return [
{"role": "user", "content": "hello"},
{
"role": "assistant",
"content": "Hi there!",
"reasoning": reasoning,
"finish_reason": "stop",
},
]
def test_reasoning_present(self):
messages = self._build_messages(reasoning="Let me think...")
last_reasoning = None
for msg in reversed(messages):
if msg.get("role") == "assistant" and msg.get("reasoning"):
last_reasoning = msg["reasoning"]
break
self.assertEqual(last_reasoning, "Let me think...")
def test_reasoning_none(self):
messages = self._build_messages(reasoning=None)
last_reasoning = None
for msg in reversed(messages):
if msg.get("role") == "assistant" and msg.get("reasoning"):
last_reasoning = msg["reasoning"]
break
self.assertIsNone(last_reasoning)
def test_picks_last_assistant(self):
messages = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "...", "reasoning": "first thought"},
{"role": "tool", "content": "result"},
{"role": "assistant", "content": "done!", "reasoning": "final thought"},
]
last_reasoning = None
for msg in reversed(messages):
if msg.get("role") == "assistant" and msg.get("reasoning"):
last_reasoning = msg["reasoning"]
break
self.assertEqual(last_reasoning, "final thought")
def test_empty_reasoning_treated_as_none(self):
messages = self._build_messages(reasoning="")
last_reasoning = None
for msg in reversed(messages):
if msg.get("role") == "assistant" and msg.get("reasoning"):
last_reasoning = msg["reasoning"]
break
self.assertIsNone(last_reasoning)
# ---------------------------------------------------------------------------
# Reasoning display collapse
# ---------------------------------------------------------------------------
class TestReasoningCollapse(unittest.TestCase):
"""Verify long reasoning is collapsed to 10 lines in the box."""
def test_short_reasoning_not_collapsed(self):
reasoning = "\n".join(f"Line {i}" for i in range(5))
lines = reasoning.strip().splitlines()
self.assertLessEqual(len(lines), 10)
def test_long_reasoning_collapsed(self):
reasoning = "\n".join(f"Line {i}" for i in range(25))
lines = reasoning.strip().splitlines()
self.assertTrue(len(lines) > 10)
if len(lines) > 10:
display = "\n".join(lines[:10])
display += f"\n ... ({len(lines) - 10} more lines)"
display_lines = display.splitlines()
self.assertEqual(len(display_lines), 11)
self.assertIn("15 more lines", display_lines[-1])
def test_exactly_10_lines_not_collapsed(self):
reasoning = "\n".join(f"Line {i}" for i in range(10))
lines = reasoning.strip().splitlines()
self.assertEqual(len(lines), 10)
self.assertFalse(len(lines) > 10)
def test_intermediate_callback_collapses_to_5(self):
"""_on_reasoning shows max 5 lines."""
reasoning = "\n".join(f"Step {i}" for i in range(12))
lines = reasoning.strip().splitlines()
if len(lines) > 5:
preview = "\n".join(lines[:5])
preview += f"\n ... ({len(lines) - 5} more lines)"
else:
preview = reasoning.strip()
preview_lines = preview.splitlines()
self.assertEqual(len(preview_lines), 6)
self.assertIn("7 more lines", preview_lines[-1])
# ---------------------------------------------------------------------------
# Reasoning callback
# ---------------------------------------------------------------------------
class TestReasoningCallback(unittest.TestCase):
"""Verify reasoning_callback invocation."""
def test_callback_invoked_with_reasoning(self):
captured = []
agent = MagicMock()
agent.reasoning_callback = lambda t: captured.append(t)
agent._extract_reasoning = MagicMock(return_value="deep thought")
reasoning_text = agent._extract_reasoning(MagicMock())
if reasoning_text and agent.reasoning_callback:
agent.reasoning_callback(reasoning_text)
self.assertEqual(captured, ["deep thought"])
def test_callback_not_invoked_without_reasoning(self):
captured = []
agent = MagicMock()
agent.reasoning_callback = lambda t: captured.append(t)
agent._extract_reasoning = MagicMock(return_value=None)
reasoning_text = agent._extract_reasoning(MagicMock())
if reasoning_text and agent.reasoning_callback:
agent.reasoning_callback(reasoning_text)
self.assertEqual(captured, [])
def test_callback_none_does_not_crash(self):
reasoning_text = "some thought"
callback = None
if reasoning_text and callback:
callback(reasoning_text)
# No exception = pass
# ---------------------------------------------------------------------------
# Real provider format extraction
# ---------------------------------------------------------------------------
class TestExtractReasoningFormats(unittest.TestCase):
"""Test _extract_reasoning with real provider response formats."""
def _get_extractor(self):
from run_agent import AIAgent
return AIAgent._extract_reasoning
def test_openrouter_reasoning_details(self):
extract = self._get_extractor()
msg = SimpleNamespace(
reasoning=None,
reasoning_content=None,
reasoning_details=[
{"type": "reasoning.summary", "summary": "Analyzing Python lists."},
],
)
result = extract(None, msg)
self.assertIn("Python lists", result)
def test_deepseek_reasoning_field(self):
extract = self._get_extractor()
msg = SimpleNamespace(
reasoning="Solving step by step.\nx + y = 8.",
reasoning_content=None,
)
result = extract(None, msg)
self.assertIn("x + y = 8", result)
def test_moonshot_reasoning_content(self):
extract = self._get_extractor()
msg = SimpleNamespace(
reasoning_content="Explaining async/await.",
)
result = extract(None, msg)
self.assertIn("async/await", result)
def test_no_reasoning_returns_none(self):
extract = self._get_extractor()
msg = SimpleNamespace(content="Hello!")
result = extract(None, msg)
self.assertIsNone(result)
# ---------------------------------------------------------------------------
# Config defaults
# ---------------------------------------------------------------------------
class TestConfigDefault(unittest.TestCase):
"""Verify config default for show_reasoning."""
def test_default_config_has_show_reasoning(self):
from hermes_cli.config import DEFAULT_CONFIG
display = DEFAULT_CONFIG.get("display", {})
self.assertIn("show_reasoning", display)
self.assertFalse(display["show_reasoning"])
class TestCommandRegistered(unittest.TestCase):
"""Verify /reasoning is in the COMMANDS dict."""
def test_reasoning_in_commands(self):
from hermes_cli.commands import COMMANDS
self.assertIn("/reasoning", COMMANDS)
# ---------------------------------------------------------------------------
# End-to-end pipeline
# ---------------------------------------------------------------------------
class TestEndToEndPipeline(unittest.TestCase):
"""Simulate the full pipeline: extraction -> result dict -> display."""
def test_openrouter_claude_pipeline(self):
from run_agent import AIAgent
api_message = SimpleNamespace(
role="assistant",
content="Lists support append().",
tool_calls=None,
reasoning=None,
reasoning_content=None,
reasoning_details=[
{"type": "reasoning.summary", "summary": "Python list methods."},
],
)
reasoning = AIAgent._extract_reasoning(None, api_message)
self.assertIsNotNone(reasoning)
messages = [
{"role": "user", "content": "How do I add items?"},
{"role": "assistant", "content": api_message.content, "reasoning": reasoning},
]
last_reasoning = None
for msg in reversed(messages):
if msg.get("role") == "assistant" and msg.get("reasoning"):
last_reasoning = msg["reasoning"]
break
result = {
"final_response": api_message.content,
"last_reasoning": last_reasoning,
}
self.assertIn("last_reasoning", result)
self.assertIn("Python list methods", result["last_reasoning"])
def test_no_reasoning_model_pipeline(self):
from run_agent import AIAgent
api_message = SimpleNamespace(content="Paris.", tool_calls=None)
reasoning = AIAgent._extract_reasoning(None, api_message)
self.assertIsNone(reasoning)
result = {"final_response": api_message.content, "last_reasoning": reasoning}
self.assertIsNone(result["last_reasoning"])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,257 +0,0 @@
"""Tests for streaming token output — accumulator shape, callback order, fallback."""
import queue
import threading
from types import SimpleNamespace
from unittest.mock import MagicMock, patch, call
import pytest
from run_agent import AIAgent
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_tool_defs(*names):
return [
{"type": "function", "function": {"name": n, "description": f"{n}", "parameters": {"type": "object", "properties": {}}}}
for n in names
]
@pytest.fixture()
def agent():
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
cb = MagicMock()
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
stream_delta_callback=cb,
)
a.client = MagicMock()
a._stream_cb = cb
return a
# ---------------------------------------------------------------------------
# Helpers — fake streaming chunks
# ---------------------------------------------------------------------------
def _chunk(content=None, tool_call_delta=None, finish_reason=None, usage=None, model=None):
delta = SimpleNamespace(content=content, tool_calls=tool_call_delta,
reasoning_content=None, reasoning=None)
choice = SimpleNamespace(delta=delta, finish_reason=finish_reason)
c = SimpleNamespace(choices=[choice])
if usage is not None:
c.usage = SimpleNamespace(**usage)
if model:
c.model = model
return c
def _usage_chunk(**kw):
c = SimpleNamespace(choices=[], usage=SimpleNamespace(**kw))
return c
def _tc_delta(index, id=None, name=None, arguments=None, type=None):
fn = SimpleNamespace(name=name, arguments=arguments)
return SimpleNamespace(index=index, id=id, type=type, function=fn)
# ---------------------------------------------------------------------------
# Tests: accumulator shape
# ---------------------------------------------------------------------------
class TestStreamingAccumulator:
def test_text_only_response(self, agent):
"""Streaming text-only response produces correct synthetic shape."""
chunks = [
_chunk(content="Hello", model="test/m"),
_chunk(content=" world"),
_chunk(finish_reason="stop"),
_usage_chunk(prompt_tokens=10, completion_tokens=5, total_tokens=15),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"model": "test"})
assert resp.choices[0].message.content == "Hello world"
assert resp.choices[0].message.tool_calls is None
assert resp.choices[0].finish_reason == "stop"
assert resp.usage.prompt_tokens == 10
assert resp.model == "test/m"
def test_tool_call_response(self, agent):
"""Streaming tool-call response accumulates function name + arguments."""
chunks = [
_chunk(tool_call_delta=[_tc_delta(0, id="call_1", name="web_search", arguments='{"q', type="function")]),
_chunk(tool_call_delta=[_tc_delta(0, arguments='uery": "hi"}')]),
_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"model": "test"})
tc = resp.choices[0].message.tool_calls
assert tc is not None
assert len(tc) == 1
assert tc[0].id == "call_1"
assert tc[0].function.name == "web_search"
assert tc[0].function.arguments == '{"query": "hi"}'
assert resp.choices[0].finish_reason == "tool_calls"
def test_mixed_content_and_tool_calls(self, agent):
"""Content + tool calls in same stream are both accumulated."""
chunks = [
_chunk(content="Let me check."),
_chunk(tool_call_delta=[_tc_delta(0, id="c1", name="web_search", arguments="{}", type="function")]),
_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"model": "test"})
assert resp.choices[0].message.content == "Let me check."
assert len(resp.choices[0].message.tool_calls) == 1
class TestStreamingCallbacks:
def test_deltas_fire_in_order(self, agent):
"""stream_delta_callback receives content deltas in order."""
received = []
agent.stream_delta_callback = lambda t: received.append(t)
chunks = [_chunk(content="a"), _chunk(content="b"), _chunk(content="c"), _chunk(finish_reason="stop")]
agent.client.chat.completions.create.return_value = iter(chunks)
agent._interruptible_streaming_api_call({"model": "test"})
assert received == ["a", "b", "c"]
def test_on_first_delta_fires_once(self, agent):
first = MagicMock()
chunks = [_chunk(content="x"), _chunk(content="y"), _chunk(finish_reason="stop")]
agent.client.chat.completions.create.return_value = iter(chunks)
agent._interruptible_streaming_api_call({"model": "test"}, on_first_delta=first)
first.assert_called_once()
def test_tool_only_does_not_fire_callback(self, agent):
"""Tool-call-only stream does not invoke stream_delta_callback."""
received = []
agent.stream_delta_callback = lambda t: received.append(t)
chunks = [
_chunk(tool_call_delta=[_tc_delta(0, id="c1", name="t", arguments="{}", type="function")]),
_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
agent._interruptible_streaming_api_call({"model": "test"})
assert received == []
class TestStreamingFallback:
def test_stream_error_falls_back(self, agent):
"""When streaming fails with 'not support', falls back to non-streaming."""
agent.client.chat.completions.create.side_effect = [
Exception("streaming not supported by this provider"),
SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(content="ok", tool_calls=None, reasoning=None, reasoning_content=None, reasoning_details=None),
finish_reason="stop",
)],
usage=None,
model="test/m",
),
]
resp = agent._interruptible_streaming_api_call({"model": "test"})
assert resp.choices[0].message.content == "ok"
assert agent.client.chat.completions.create.call_count == 2
def test_non_stream_error_raises(self, agent):
"""Non-stream-related errors propagate normally."""
agent.client.chat.completions.create.side_effect = ValueError("bad request")
with pytest.raises(ValueError, match="bad request"):
agent._interruptible_streaming_api_call({"model": "test"})
# ---------------------------------------------------------------------------
# Tests: base.py already_sent contract
# ---------------------------------------------------------------------------
class TestAlreadySentContract:
def _make_adapter(self, send_side_effect=None):
from gateway.platforms.base import BasePlatformAdapter, SendResult
from gateway.config import Platform, PlatformConfig
class FakeAdapter(BasePlatformAdapter):
async def connect(self): return True
async def disconnect(self): pass
async def get_chat_info(self, chat_id): return {"name": "test"}
async def send(self, chat_id, content, reply_to=None, metadata=None):
if send_side_effect is not None:
send_side_effect(content)
return SendResult(success=True, message_id="1")
cfg = PlatformConfig(enabled=True)
adapter = FakeAdapter(cfg, Platform.TELEGRAM)
adapter._running = True
return adapter
@pytest.mark.asyncio
async def test_already_sent_skips_send(self):
"""Handler returning already_sent=True prevents base from calling send()."""
from gateway.platforms.base import MessageEvent
from gateway.config import Platform
from gateway.session import SessionSource
sent = []
adapter = self._make_adapter(send_side_effect=lambda c: sent.append(c))
async def handler(event):
return {"content": "hello", "already_sent": True}
adapter.set_message_handler(handler)
event = MessageEvent(
text="hi",
source=SessionSource(platform=Platform.TELEGRAM, chat_id="1", user_id="u1"),
)
await adapter._process_message_background(event, "s1")
assert sent == [], "send() should not be called when already_sent=True"
@pytest.mark.asyncio
async def test_string_response_sends_normally(self):
"""Handler returning a plain string triggers send() as before."""
from gateway.platforms.base import MessageEvent
from gateway.config import Platform
from gateway.session import SessionSource
sent = []
adapter = self._make_adapter(send_side_effect=lambda c: sent.append(c))
async def handler(event):
return "hello"
adapter.set_message_handler(handler)
event = MessageEvent(
text="hi",
source=SessionSource(platform=Platform.TELEGRAM, chat_id="1", user_id="u1"),
)
await adapter._process_message_background(event, "s1")
assert "hello" in sent

View File

@@ -147,6 +147,7 @@ Type `/` in the interactive CLI to see an autocomplete dropdown.
| `/config` | Show current configuration |
| `/prompt [text]` | View/set custom system prompt |
| `/personality [name]` | Set a predefined personality |
| `/reasoning [arg]` | Manage reasoning effort and display. Args: effort level (`none`, `low`, `medium`, `high`, `xhigh`) or display toggle (`show`, `hide`). No args shows current state. |
### Conversation

View File

@@ -104,6 +104,7 @@ Type `/` to see an autocomplete dropdown of all available commands.
| `/config` | Show current configuration |
| `/prompt [text]` | View/set/clear custom system prompt |
| `/personality [name]` | Set a predefined personality |
| `/reasoning [arg]` | Manage reasoning effort (`none`/`low`/`medium`/`high`/`xhigh`) and display (`show`/`hide`) |
### Conversation Management

View File

@@ -608,6 +608,16 @@ agent:
When unset (default), reasoning effort defaults to "medium" — a balanced level that works well for most tasks. Setting a value overrides it — higher reasoning effort gives better results on complex tasks at the cost of more tokens and latency.
You can also change the reasoning effort at runtime with the `/reasoning` command:
```
/reasoning # Show current effort level and display state
/reasoning high # Set reasoning effort to high
/reasoning none # Disable reasoning
/reasoning show # Show model thinking above each response
/reasoning hide # Hide model thinking
```
## TTS Configuration
```yaml
@@ -632,6 +642,7 @@ display:
compact: false # Compact output mode (less whitespace)
resume_display: full # full (show previous messages on resume) | minimal (one-liner only)
bell_on_complete: false # Play terminal bell when agent finishes (great for long tasks)
show_reasoning: false # Show model reasoning/thinking above each response (toggle with /reasoning show|hide)
```
| Mode | What you see |