mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-14 22:29:09 +08:00
Compare commits
2 Commits
claude-cod
...
feat/web-g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f6b0b09988 | ||
|
|
853d32fb90 |
56
cli.py
56
cli.py
@@ -3188,6 +3188,13 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
|
||||
self.bell_on_complete = CLI_CONFIG["display"].get("bell_on_complete", False)
|
||||
# show_reasoning: display model thinking/reasoning before the response
|
||||
self.show_reasoning = CLI_CONFIG["display"].get("show_reasoning", False)
|
||||
# Live web_extract summarization box (reasoning-style). Registered
|
||||
# process-wide; no-op for runs that never summarize a web page.
|
||||
try:
|
||||
from tools.summary_display import set_summary_stream_callback
|
||||
set_summary_stream_callback(self._on_summary_stream)
|
||||
except Exception:
|
||||
logger.debug("Failed to register summary stream callback", exc_info=True)
|
||||
_configure_output_history(
|
||||
enabled=CLI_CONFIG["display"].get("persistent_output", True),
|
||||
max_lines=CLI_CONFIG["display"].get("persistent_output_max_lines", 200),
|
||||
@@ -4663,6 +4670,55 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
|
||||
self._deferred_content = ""
|
||||
self._emit_stream_text(deferred)
|
||||
|
||||
# ── Web-extract summary streaming (reasoning-style box) ─────────────
|
||||
|
||||
def _on_summary_stream(self, event: str, **kwargs) -> None:
|
||||
"""Render web_extract summarization tokens in a dim live box.
|
||||
|
||||
Mirrors the reasoning-box UX: opens a dim 'Summarizing' box when the
|
||||
summarizer LLM starts streaming, streams its tokens line-by-line,
|
||||
and closes the box with a char count when done. Registered with
|
||||
tools.summary_display; called from the summarizer's event loop
|
||||
thread, so output goes through _cprint (patch_stdout-safe).
|
||||
"""
|
||||
try:
|
||||
if event == "start":
|
||||
url = kwargs.get("url", "")
|
||||
w = self._scrollback_box_width()
|
||||
label = " Summarizing "
|
||||
if url:
|
||||
short = url if len(url) <= w - 20 else url[: w - 23] + "..."
|
||||
label = f" Summarizing · {short} "
|
||||
fill = w - 2 - len(label)
|
||||
_cprint(f"\n{_DIM}┌─{label}{'─' * max(fill - 1, 0)}┐{_RST}")
|
||||
self._summary_box_opened = True
|
||||
self._summary_buf = ""
|
||||
elif event == "delta":
|
||||
if not getattr(self, "_summary_box_opened", False):
|
||||
return
|
||||
self._summary_buf = getattr(self, "_summary_buf", "") + kwargs.get("text", "")
|
||||
while "\n" in self._summary_buf:
|
||||
line, self._summary_buf = self._summary_buf.split("\n", 1)
|
||||
_cprint(f"{_DIM}{line}{_RST}")
|
||||
if len(self._summary_buf) > 80:
|
||||
_cprint(f"{_DIM}{self._summary_buf}{_RST}")
|
||||
self._summary_buf = ""
|
||||
elif event == "end":
|
||||
if not getattr(self, "_summary_box_opened", False):
|
||||
return
|
||||
buf = getattr(self, "_summary_buf", "")
|
||||
if buf:
|
||||
_cprint(f"{_DIM}{buf}{_RST}")
|
||||
self._summary_buf = ""
|
||||
w = self._scrollback_box_width()
|
||||
chars = kwargs.get("char_count", 0)
|
||||
tail = f" {chars:,} chars " if kwargs.get("ok") else " summarization fell back "
|
||||
fill = w - 2 - len(tail)
|
||||
_cprint(f"{_DIM}└{'─' * max(fill - 1, 0)}{tail}┘{_RST}")
|
||||
self._summary_box_opened = False
|
||||
except Exception:
|
||||
logger.debug("Summary stream render failed", exc_info=True)
|
||||
|
||||
def _stream_delta(self, text) -> None:
|
||||
"""Line-buffered streaming callback for real-time token rendering.
|
||||
|
||||
|
||||
@@ -1017,6 +1017,15 @@ DEFAULT_CONFIG = {
|
||||
"backend": "", # shared fallback — applies to both search and extract
|
||||
"search_backend": "", # per-capability override for web_search (e.g. "searxng")
|
||||
"extract_backend": "", # per-capability override for web_extract (e.g. "native")
|
||||
# Grounded citations on web results:
|
||||
# "auto" — results carry stable source ids; the model is instructed to
|
||||
# cite inline only for research/report-style requests
|
||||
# "always" — model is instructed to cite inline whenever it uses results
|
||||
# "off" — no source ids, no citation guidance (pre-feature behavior)
|
||||
"citations": "auto",
|
||||
# Stream web_extract page summarization live into a reasoning-style
|
||||
# box in the CLI (no effect on gateway/cron).
|
||||
"summary_stream": True,
|
||||
},
|
||||
|
||||
"browser": {
|
||||
|
||||
382
tests/tools/test_web_grounding.py
Normal file
382
tests/tools/test_web_grounding.py
Normal file
@@ -0,0 +1,382 @@
|
||||
"""Tests for web grounding citations + live summary display streaming.
|
||||
|
||||
Covers:
|
||||
- tools/summary_display.py callback registry + single-slot semantics
|
||||
- source registry stable citation IDs
|
||||
- web_search_tool result annotation (source ids + citation_guidance)
|
||||
- web_extract_tool trimmed-output annotation
|
||||
- _try_stream_summarizer streaming fast path + fallback semantics
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tools import summary_display
|
||||
from tools.web_tools import (
|
||||
CITATION_GUIDANCE,
|
||||
CITATION_GUIDANCE_AUTO,
|
||||
_get_citation_guidance,
|
||||
_get_citations_mode,
|
||||
_summary_stream_enabled,
|
||||
_try_stream_summarizer,
|
||||
get_source_id,
|
||||
reset_source_registry,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clean_state():
|
||||
reset_source_registry()
|
||||
summary_display.set_summary_stream_callback(None)
|
||||
summary_display._slot_holder = None
|
||||
yield
|
||||
reset_source_registry()
|
||||
summary_display.set_summary_stream_callback(None)
|
||||
summary_display._slot_holder = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Source registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSourceRegistry:
|
||||
def test_ids_are_stable_and_sequential(self):
|
||||
a = get_source_id("https://example.com/a")
|
||||
b = get_source_id("https://example.com/b")
|
||||
assert (a, b) == (1, 2)
|
||||
# Same URL → same id
|
||||
assert get_source_id("https://example.com/a") == a
|
||||
|
||||
def test_normalization_fragment_and_trailing_slash(self):
|
||||
a = get_source_id("https://example.com/page")
|
||||
assert get_source_id("https://example.com/page/") == a
|
||||
assert get_source_id("https://example.com/page#section") == a
|
||||
|
||||
def test_reset_clears_ids(self):
|
||||
get_source_id("https://example.com/x")
|
||||
reset_source_registry()
|
||||
assert get_source_id("https://example.com/y") == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Summary display registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSummaryDisplay:
|
||||
def test_emit_noop_without_callback(self):
|
||||
# Must not raise
|
||||
summary_display.emit("delta", text="hi")
|
||||
|
||||
def test_emit_invokes_callback(self):
|
||||
events = []
|
||||
summary_display.set_summary_stream_callback(
|
||||
lambda event, **kw: events.append((event, kw))
|
||||
)
|
||||
summary_display.emit("start", url="https://x.com", title="X")
|
||||
summary_display.emit("delta", text="token")
|
||||
assert events == [
|
||||
("start", {"url": "https://x.com", "title": "X"}),
|
||||
("delta", {"text": "token"}),
|
||||
]
|
||||
|
||||
def test_callback_exception_swallowed(self):
|
||||
def boom(event, **kw):
|
||||
raise RuntimeError("nope")
|
||||
summary_display.set_summary_stream_callback(boom)
|
||||
summary_display.emit("start", url="u", title="t") # must not raise
|
||||
|
||||
def test_slot_requires_callback(self):
|
||||
token = object()
|
||||
assert summary_display.try_acquire_stream_slot(token) is False
|
||||
|
||||
def test_slot_single_holder(self):
|
||||
summary_display.set_summary_stream_callback(lambda e, **kw: None)
|
||||
t1, t2 = object(), object()
|
||||
assert summary_display.try_acquire_stream_slot(t1) is True
|
||||
assert summary_display.try_acquire_stream_slot(t2) is False
|
||||
summary_display.release_stream_slot(t1)
|
||||
assert summary_display.try_acquire_stream_slot(t2) is True
|
||||
summary_display.release_stream_slot(t2)
|
||||
|
||||
def test_release_wrong_token_keeps_slot(self):
|
||||
summary_display.set_summary_stream_callback(lambda e, **kw: None)
|
||||
t1, t2 = object(), object()
|
||||
assert summary_display.try_acquire_stream_slot(t1)
|
||||
summary_display.release_stream_slot(t2) # not the holder — no-op
|
||||
assert summary_display.try_acquire_stream_slot(t2) is False
|
||||
summary_display.release_stream_slot(t1)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# web_search_tool annotation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSearchAnnotation:
|
||||
def _provider(self, results):
|
||||
provider = MagicMock()
|
||||
provider.name = "fake"
|
||||
provider.supports_search.return_value = True
|
||||
provider.search.return_value = {"success": True, "data": {"web": results}}
|
||||
return provider
|
||||
|
||||
def test_results_get_source_ids_and_guidance(self):
|
||||
from tools import web_tools
|
||||
provider = self._provider([
|
||||
{"title": "A", "url": "https://a.com", "description": "aaa", "position": 1},
|
||||
{"title": "B", "url": "https://b.com", "description": "bbb", "position": 2},
|
||||
])
|
||||
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
|
||||
patch("agent.web_search_registry.get_provider", return_value=provider), \
|
||||
patch.object(web_tools, "_get_search_backend", return_value="fake"):
|
||||
out = json.loads(web_tools.web_search_tool("query"))
|
||||
web = out["data"]["web"]
|
||||
assert web[0]["source"] == "[1]"
|
||||
assert web[1]["source"] == "[2]"
|
||||
assert out["citation_guidance"] == CITATION_GUIDANCE_AUTO
|
||||
|
||||
def test_same_url_keeps_id_across_calls(self):
|
||||
from tools import web_tools
|
||||
provider = self._provider([
|
||||
{"title": "A", "url": "https://a.com", "description": "aaa", "position": 1},
|
||||
])
|
||||
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
|
||||
patch("agent.web_search_registry.get_provider", return_value=provider), \
|
||||
patch.object(web_tools, "_get_search_backend", return_value="fake"):
|
||||
first = json.loads(web_tools.web_search_tool("query"))
|
||||
second = json.loads(web_tools.web_search_tool("query again"))
|
||||
assert first["data"]["web"][0]["source"] == second["data"]["web"][0]["source"]
|
||||
|
||||
def test_empty_results_no_guidance(self):
|
||||
from tools import web_tools
|
||||
provider = self._provider([])
|
||||
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
|
||||
patch("agent.web_search_registry.get_provider", return_value=provider), \
|
||||
patch.object(web_tools, "_get_search_backend", return_value="fake"):
|
||||
out = json.loads(web_tools.web_search_tool("query"))
|
||||
assert "citation_guidance" not in out
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# web_extract_tool annotation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestExtractAnnotation:
|
||||
def test_trimmed_results_carry_source_and_guidance(self):
|
||||
from tools import web_tools
|
||||
|
||||
provider = MagicMock()
|
||||
provider.name = "fake"
|
||||
provider.supports_extract.return_value = True
|
||||
provider.extract = MagicMock(return_value=[
|
||||
{"url": "https://a.com", "title": "A", "content": "short content"},
|
||||
])
|
||||
|
||||
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
|
||||
patch("agent.web_search_registry.get_provider", return_value=provider), \
|
||||
patch.object(web_tools, "_get_extract_backend", return_value="fake"), \
|
||||
patch.object(web_tools, "check_auxiliary_model", return_value=False), \
|
||||
patch.object(web_tools, "async_is_safe_url", new=_async_true):
|
||||
out = json.loads(asyncio.run(
|
||||
web_tools.web_extract_tool(["https://a.com"], use_llm_processing=False)
|
||||
))
|
||||
assert out["results"][0]["source"] == "[1]"
|
||||
assert out["citation_guidance"] == CITATION_GUIDANCE_AUTO
|
||||
|
||||
def test_search_then_extract_share_ids(self):
|
||||
from tools import web_tools
|
||||
|
||||
search_provider = MagicMock()
|
||||
search_provider.name = "fake"
|
||||
search_provider.supports_search.return_value = True
|
||||
search_provider.search.return_value = {
|
||||
"success": True,
|
||||
"data": {"web": [{"title": "A", "url": "https://a.com", "description": "d", "position": 1}]},
|
||||
}
|
||||
extract_provider = MagicMock()
|
||||
extract_provider.name = "fake"
|
||||
extract_provider.supports_extract.return_value = True
|
||||
extract_provider.extract = MagicMock(return_value=[
|
||||
{"url": "https://a.com", "title": "A", "content": "body"},
|
||||
])
|
||||
|
||||
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
|
||||
patch("agent.web_search_registry.get_provider", return_value=search_provider), \
|
||||
patch.object(web_tools, "_get_search_backend", return_value="fake"):
|
||||
search_out = json.loads(web_tools.web_search_tool("q"))
|
||||
|
||||
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
|
||||
patch("agent.web_search_registry.get_provider", return_value=extract_provider), \
|
||||
patch.object(web_tools, "_get_extract_backend", return_value="fake"), \
|
||||
patch.object(web_tools, "check_auxiliary_model", return_value=False), \
|
||||
patch.object(web_tools, "async_is_safe_url", new=_async_true):
|
||||
extract_out = json.loads(asyncio.run(
|
||||
web_tools.web_extract_tool(["https://a.com"], use_llm_processing=False)
|
||||
))
|
||||
|
||||
assert search_out["data"]["web"][0]["source"] == extract_out["results"][0]["source"] == "[1]"
|
||||
|
||||
|
||||
async def _async_true(url):
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config toggles (web.citations / web.summary_stream)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCitationModes:
|
||||
def test_default_mode_is_auto(self):
|
||||
with patch("tools.web_tools._load_web_config", return_value={}):
|
||||
assert _get_citations_mode() == "auto"
|
||||
assert _get_citation_guidance() == CITATION_GUIDANCE_AUTO
|
||||
|
||||
def test_always_mode(self):
|
||||
with patch("tools.web_tools._load_web_config", return_value={"citations": "always"}):
|
||||
assert _get_citation_guidance() == CITATION_GUIDANCE
|
||||
|
||||
def test_off_mode_returns_none(self):
|
||||
with patch("tools.web_tools._load_web_config", return_value={"citations": "off"}):
|
||||
assert _get_citation_guidance() is None
|
||||
|
||||
def test_invalid_mode_falls_back_to_auto(self):
|
||||
with patch("tools.web_tools._load_web_config", return_value={"citations": "bogus"}):
|
||||
assert _get_citations_mode() == "auto"
|
||||
|
||||
def test_off_mode_strips_annotations_from_search(self):
|
||||
from tools import web_tools
|
||||
provider = MagicMock()
|
||||
provider.name = "fake"
|
||||
provider.supports_search.return_value = True
|
||||
provider.search.return_value = {
|
||||
"success": True,
|
||||
"data": {"web": [{"title": "A", "url": "https://a.com", "description": "d", "position": 1}]},
|
||||
}
|
||||
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
|
||||
patch("agent.web_search_registry.get_provider", return_value=provider), \
|
||||
patch.object(web_tools, "_get_search_backend", return_value="fake"), \
|
||||
patch.object(web_tools, "_load_web_config", return_value={"citations": "off"}):
|
||||
out = json.loads(web_tools.web_search_tool("query"))
|
||||
assert "citation_guidance" not in out
|
||||
assert "source" not in out["data"]["web"][0]
|
||||
|
||||
def test_off_mode_strips_annotations_from_extract(self):
|
||||
from tools import web_tools
|
||||
provider = MagicMock()
|
||||
provider.name = "fake"
|
||||
provider.supports_extract.return_value = True
|
||||
provider.extract = MagicMock(return_value=[
|
||||
{"url": "https://a.com", "title": "A", "content": "body"},
|
||||
])
|
||||
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
|
||||
patch("agent.web_search_registry.get_provider", return_value=provider), \
|
||||
patch.object(web_tools, "_get_extract_backend", return_value="fake"), \
|
||||
patch.object(web_tools, "check_auxiliary_model", return_value=False), \
|
||||
patch.object(web_tools, "async_is_safe_url", new=_async_true), \
|
||||
patch.object(web_tools, "_load_web_config", return_value={"citations": "off"}):
|
||||
out = json.loads(asyncio.run(
|
||||
web_tools.web_extract_tool(["https://a.com"], use_llm_processing=False)
|
||||
))
|
||||
assert "citation_guidance" not in out
|
||||
assert "source" not in out["results"][0]
|
||||
|
||||
def test_summary_stream_toggle(self):
|
||||
with patch("tools.web_tools._load_web_config", return_value={}):
|
||||
assert _summary_stream_enabled() is True
|
||||
with patch("tools.web_tools._load_web_config", return_value={"summary_stream": False}):
|
||||
assert _summary_stream_enabled() is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Streaming summarizer fast path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_stream_chunks(texts):
|
||||
async def _gen():
|
||||
for t in texts:
|
||||
yield SimpleNamespace(
|
||||
choices=[SimpleNamespace(delta=SimpleNamespace(content=t))]
|
||||
)
|
||||
return _gen()
|
||||
|
||||
|
||||
class _FakeAsyncClient:
|
||||
"""Minimal async client whose chat.completions.create returns a stream."""
|
||||
|
||||
def __init__(self, chunks=None, exc=None):
|
||||
self._chunks = chunks
|
||||
self._exc = exc
|
||||
self.kwargs = None
|
||||
outer = self
|
||||
|
||||
class _Completions:
|
||||
async def create(self, **kwargs):
|
||||
outer.kwargs = kwargs
|
||||
if outer._exc:
|
||||
raise outer._exc
|
||||
return _make_stream_chunks(outer._chunks)
|
||||
|
||||
self.chat = SimpleNamespace(completions=_Completions())
|
||||
|
||||
|
||||
class TestStreamSummarizer:
|
||||
def test_no_callback_returns_none_without_calling(self):
|
||||
client = _FakeAsyncClient(chunks=["x"])
|
||||
result = asyncio.run(_try_stream_summarizer(
|
||||
client, "model", {}, "sys", "user", 1000))
|
||||
assert result is None
|
||||
assert client.kwargs is None # never reached the API
|
||||
|
||||
def test_streams_and_returns_summary(self):
|
||||
events = []
|
||||
summary_display.set_summary_stream_callback(
|
||||
lambda event, **kw: events.append((event, kw)))
|
||||
client = _FakeAsyncClient(chunks=["Hello ", "world"])
|
||||
result = asyncio.run(_try_stream_summarizer(
|
||||
client, "model", {}, "sys", "user", 1000,
|
||||
context_str="Title: T\nSource: https://a.com\n\n"))
|
||||
assert result == "Hello world"
|
||||
assert client.kwargs["stream"] is True
|
||||
names = [e[0] for e in events]
|
||||
assert names == ["start", "delta", "delta", "end"]
|
||||
assert events[0][1] == {"url": "https://a.com", "title": "T"}
|
||||
assert events[-1][1]["ok"] is True
|
||||
assert events[-1][1]["char_count"] == len("Hello world")
|
||||
# Slot released
|
||||
assert summary_display.try_acquire_stream_slot(object()) is True
|
||||
|
||||
def test_provider_error_falls_back(self):
|
||||
events = []
|
||||
summary_display.set_summary_stream_callback(
|
||||
lambda event, **kw: events.append((event, kw)))
|
||||
client = _FakeAsyncClient(exc=RuntimeError("stream unsupported"))
|
||||
result = asyncio.run(_try_stream_summarizer(
|
||||
client, "model", {}, "sys", "user", 1000))
|
||||
assert result is None
|
||||
# Error before "start" → no end event needed
|
||||
assert all(e[0] != "end" or e[1]["ok"] is False for e in events)
|
||||
# Slot released even on failure
|
||||
assert summary_display.try_acquire_stream_slot(object()) is True
|
||||
|
||||
def test_empty_stream_falls_back(self):
|
||||
summary_display.set_summary_stream_callback(lambda e, **kw: None)
|
||||
client = _FakeAsyncClient(chunks=[])
|
||||
result = asyncio.run(_try_stream_summarizer(
|
||||
client, "model", {}, "sys", "user", 1000))
|
||||
assert result is None
|
||||
|
||||
def test_slot_busy_returns_none(self):
|
||||
summary_display.set_summary_stream_callback(lambda e, **kw: None)
|
||||
holder = object()
|
||||
assert summary_display.try_acquire_stream_slot(holder)
|
||||
try:
|
||||
client = _FakeAsyncClient(chunks=["x"])
|
||||
result = asyncio.run(_try_stream_summarizer(
|
||||
client, "model", {}, "sys", "user", 1000))
|
||||
assert result is None
|
||||
assert client.kwargs is None
|
||||
finally:
|
||||
summary_display.release_stream_slot(holder)
|
||||
85
tools/summary_display.py
Normal file
85
tools/summary_display.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Live display hook for web-extract summarization streams.
|
||||
|
||||
When ``web_extract`` summarizes a long page, the summarizer LLM call can take
|
||||
many seconds with no visible activity. Front-ends (currently the interactive
|
||||
CLI) can register a callback here to mirror the summary tokens into a live
|
||||
box — the same UX as streaming reasoning blocks.
|
||||
|
||||
Design constraints:
|
||||
- Zero coupling: ``tools/web_tools.py`` only calls module functions here; if
|
||||
no front-end registered a callback (gateway, cron, subagents, tests) every
|
||||
call is a cheap no-op and the summarizer runs exactly as before.
|
||||
- One stream at a time: ``web_extract`` summarizes multiple pages in
|
||||
parallel. Only the first task to acquire the display slot streams to the
|
||||
terminal; the others run silently. This avoids interleaved boxes.
|
||||
|
||||
Callback protocol::
|
||||
|
||||
callback(event: str, **kwargs)
|
||||
|
||||
event == "start": kwargs = {"url": str, "title": str}
|
||||
event == "delta": kwargs = {"text": str}
|
||||
event == "end": kwargs = {"char_count": int, "ok": bool}
|
||||
|
||||
The callback must never raise — but we guard anyway.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from typing import Callable, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_callback: Optional[Callable] = None
|
||||
_slot_lock = threading.Lock()
|
||||
_slot_holder: Optional[object] = None
|
||||
|
||||
|
||||
def set_summary_stream_callback(callback: Optional[Callable]) -> None:
|
||||
"""Register (or clear, with None) the live summary display callback."""
|
||||
global _callback
|
||||
_callback = callback
|
||||
|
||||
|
||||
def get_summary_stream_callback() -> Optional[Callable]:
|
||||
"""Return the registered callback, or None."""
|
||||
return _callback
|
||||
|
||||
|
||||
def try_acquire_stream_slot(token: object) -> bool:
|
||||
"""Try to claim the single live-display slot for ``token``.
|
||||
|
||||
Returns True when the caller may stream to the display. Callers MUST
|
||||
call :func:`release_stream_slot` with the same token when done.
|
||||
Returns False immediately (non-blocking) when another stream owns the
|
||||
slot or no callback is registered.
|
||||
"""
|
||||
global _slot_holder
|
||||
if _callback is None:
|
||||
return False
|
||||
with _slot_lock:
|
||||
if _slot_holder is None:
|
||||
_slot_holder = token
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def release_stream_slot(token: object) -> None:
|
||||
"""Release the live-display slot if ``token`` owns it."""
|
||||
global _slot_holder
|
||||
with _slot_lock:
|
||||
if _slot_holder is token:
|
||||
_slot_holder = None
|
||||
|
||||
|
||||
def emit(event: str, **kwargs) -> None:
|
||||
"""Invoke the registered callback, swallowing any error."""
|
||||
cb = _callback
|
||||
if cb is None:
|
||||
return
|
||||
try:
|
||||
cb(event, **kwargs)
|
||||
except Exception:
|
||||
logger.debug("summary display callback failed for event %s", event, exc_info=True)
|
||||
@@ -41,6 +41,7 @@ import logging
|
||||
import os
|
||||
import re
|
||||
import asyncio
|
||||
import threading
|
||||
from typing import List, Dict, Any, Optional, TYPE_CHECKING
|
||||
import httpx # noqa: F401 — kept at module top so tests can patch tools.web_tools.httpx
|
||||
# After the web-provider plugin migration (PR #25182), the Firecrawl SDK
|
||||
@@ -346,6 +347,105 @@ def _web_requires_env() -> list[str]:
|
||||
|
||||
DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION = 5000
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Source registry — stable numbered citation IDs for grounding
|
||||
# ---------------------------------------------------------------------------
|
||||
# Perplexity-style grounding works because the model only ever emits small
|
||||
# stable integers ("[3]") that were assigned at retrieval time — it cannot
|
||||
# hallucinate a URL. We keep a process-wide URL → ID map so the same page
|
||||
# keeps the same citation number across web_search and web_extract calls
|
||||
# within a session, and annotate every result with its marker.
|
||||
#
|
||||
# Research basis: ALCE (arXiv:2305.14627) shows citing-during-generation from
|
||||
# numbered snippets beats post-hoc attribution; WebGPT (arXiv:2112.09332)
|
||||
# grounds answers in verbatim quotes collected at browse time. The summarizer
|
||||
# prompts below preserve verbatim quotes for that reason.
|
||||
_source_registry: Dict[str, int] = {}
|
||||
_source_registry_lock = threading.Lock()
|
||||
|
||||
|
||||
def _normalize_source_url(url: str) -> str:
|
||||
"""Canonicalize a URL for registry identity (strip fragment, trailing /)."""
|
||||
u = (url or "").strip()
|
||||
if "#" in u:
|
||||
u = u.split("#", 1)[0]
|
||||
return u.rstrip("/") or u
|
||||
|
||||
|
||||
def get_source_id(url: str) -> int:
|
||||
"""Return the stable citation ID for ``url``, assigning one if new."""
|
||||
key = _normalize_source_url(url)
|
||||
with _source_registry_lock:
|
||||
sid = _source_registry.get(key)
|
||||
if sid is None:
|
||||
sid = len(_source_registry) + 1
|
||||
_source_registry[key] = sid
|
||||
return sid
|
||||
|
||||
|
||||
def reset_source_registry() -> None:
|
||||
"""Clear the URL → citation-ID map (tests / new sessions)."""
|
||||
with _source_registry_lock:
|
||||
_source_registry.clear()
|
||||
|
||||
|
||||
CITATION_GUIDANCE = (
|
||||
"GROUNDING: Each result has a stable source id like [3]. When you answer "
|
||||
"the user using facts from these sources, cite by placing the bracketed "
|
||||
"id(s) immediately after each sentence they support, e.g. 'Ice is less "
|
||||
"dense than water.[1][2]' — no space before the bracket, at most 3 ids "
|
||||
"per sentence. Cite while writing, per supported sentence (not one "
|
||||
"citation dumped at the end). Only cite ids that appear in tool results "
|
||||
"you actually received; never invent an id or cite a source you did not "
|
||||
"read. Claims from your own knowledge get no citation. If sources "
|
||||
"conflict, present both with their ids. End the answer with a 'Sources:' "
|
||||
"list mapping each cited id to its URL (e.g. '[1] https://...') — list "
|
||||
"only ids you cited."
|
||||
)
|
||||
|
||||
# "auto" mode prefix: the model decides per-request whether the answer is
|
||||
# research/report-shaped. Mirrors Perplexity's leaked-prompt approach of
|
||||
# exempting query classes (translation, creative writing) via instruction
|
||||
# rather than a separate classifier.
|
||||
CITATION_GUIDANCE_AUTO = (
|
||||
"GROUNDING (conditional): Each result has a stable source id like [3]. "
|
||||
"IF the user's request is research-, report-, or fact-finding-shaped — "
|
||||
"they asked for a sourced/grounded answer, a summary of information from "
|
||||
"the web, a comparison, news, or anything where claim provenance matters "
|
||||
"— then cite: place the bracketed id(s) immediately after each sentence "
|
||||
"they support, e.g. 'Ice is less dense than water.[1][2]' — no space "
|
||||
"before the bracket, at most 3 ids per sentence, cite while writing (not "
|
||||
"one citation dumped at the end), and end with a 'Sources:' list mapping "
|
||||
"each cited id to its URL — list only ids you cited. Only cite ids that "
|
||||
"appear in tool results you actually received; never invent an id. "
|
||||
"Claims from your own knowledge get no citation. IF instead the search "
|
||||
"is incidental to a different task (quick lookup mid-coding, checking "
|
||||
"syntax/versions, casual conversation, creative writing), skip inline "
|
||||
"citations and just answer naturally — mention a URL only if the user "
|
||||
"would plausibly want the link."
|
||||
)
|
||||
|
||||
|
||||
def _get_citations_mode() -> str:
|
||||
"""Return the web citations mode: 'auto' (default), 'always', or 'off'."""
|
||||
mode = str(_load_web_config().get("citations", "auto") or "auto").strip().lower()
|
||||
return mode if mode in {"auto", "always", "off"} else "auto"
|
||||
|
||||
|
||||
def _get_citation_guidance() -> Optional[str]:
|
||||
"""Return the guidance text for the active mode, or None when off."""
|
||||
mode = _get_citations_mode()
|
||||
if mode == "off":
|
||||
return None
|
||||
return CITATION_GUIDANCE if mode == "always" else CITATION_GUIDANCE_AUTO
|
||||
|
||||
|
||||
def _summary_stream_enabled() -> bool:
|
||||
"""Whether live summary streaming to a registered display is enabled."""
|
||||
return bool(_load_web_config().get("summary_stream", True))
|
||||
|
||||
|
||||
def _is_nous_auxiliary_client(client: Any) -> bool:
|
||||
"""Return True when the resolved auxiliary backend is Nous Portal."""
|
||||
from urllib.parse import urlparse
|
||||
@@ -478,6 +578,96 @@ async def process_content_with_llm(
|
||||
return truncated
|
||||
|
||||
|
||||
async def _try_stream_summarizer(
|
||||
aux_client: Any,
|
||||
model: str,
|
||||
extra_body: Dict[str, Any],
|
||||
system_prompt: str,
|
||||
user_prompt: str,
|
||||
max_tokens: int,
|
||||
context_str: str = "",
|
||||
) -> Optional[str]:
|
||||
"""Attempt a streaming summarizer call mirrored to the live display.
|
||||
|
||||
Returns the full summary text on success, or ``None`` to signal the
|
||||
caller to fall back to the standard non-streaming path (no callback
|
||||
registered, display slot busy, provider doesn't support streaming, or
|
||||
any error mid-stream). Never raises.
|
||||
"""
|
||||
from tools.summary_display import (
|
||||
emit,
|
||||
release_stream_slot,
|
||||
try_acquire_stream_slot,
|
||||
)
|
||||
|
||||
token = object()
|
||||
if not try_acquire_stream_slot(token):
|
||||
return None
|
||||
|
||||
started = False
|
||||
try:
|
||||
# Parse "Title: ..." / "Source: ..." lines out of context_str for
|
||||
# the display header.
|
||||
url = title = ""
|
||||
for line in (context_str or "").splitlines():
|
||||
if line.startswith("Source: "):
|
||||
url = line[len("Source: "):].strip()
|
||||
elif line.startswith("Title: "):
|
||||
title = line[len("Title: "):].strip()
|
||||
|
||||
from agent.auxiliary_client import _get_task_timeout
|
||||
timeout = _get_task_timeout("web_extract")
|
||||
|
||||
kwargs: Dict[str, Any] = {
|
||||
"model": model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt},
|
||||
],
|
||||
"temperature": 0.1,
|
||||
"max_tokens": max_tokens,
|
||||
"stream": True,
|
||||
}
|
||||
if timeout:
|
||||
kwargs["timeout"] = timeout
|
||||
if extra_body:
|
||||
kwargs["extra_body"] = extra_body
|
||||
|
||||
stream = await aux_client.chat.completions.create(**kwargs)
|
||||
|
||||
emit("start", url=url, title=title)
|
||||
started = True
|
||||
|
||||
parts: List[str] = []
|
||||
async for chunk in stream:
|
||||
try:
|
||||
delta = chunk.choices[0].delta
|
||||
except (AttributeError, IndexError):
|
||||
continue
|
||||
text = getattr(delta, "content", None)
|
||||
if text:
|
||||
parts.append(text)
|
||||
emit("delta", text=text)
|
||||
|
||||
summary = "".join(parts).strip()
|
||||
emit("end", char_count=len(summary), ok=bool(summary))
|
||||
started = False
|
||||
if summary:
|
||||
return summary
|
||||
# Empty stream (reasoning-only model, provider quirk) — fall back.
|
||||
logger.debug("Streaming summarizer returned empty content; falling back")
|
||||
return None
|
||||
except Exception as e:
|
||||
# Streaming is best-effort: adapters without stream support,
|
||||
# transport errors, etc. all fall back to the robust path.
|
||||
logger.debug("Streaming summarizer failed (%s); falling back to non-streaming", str(e)[:120])
|
||||
if started:
|
||||
emit("end", char_count=0, ok=False)
|
||||
return None
|
||||
finally:
|
||||
release_stream_slot(token)
|
||||
|
||||
|
||||
async def _call_summarizer_llm(
|
||||
content: str,
|
||||
context_str: str,
|
||||
@@ -511,6 +701,11 @@ Important guidelines for chunk processing:
|
||||
4. Use bullet points and structured formatting for easy synthesis later
|
||||
5. Note any references to other sections (e.g., "as mentioned earlier", "see below") without trying to resolve them
|
||||
|
||||
GROUNDING RULES (critical — this summary will be used to answer questions with citations to this source):
|
||||
- Keep citable facts as short VERBATIM quotes in "double quotes"; keep numbers, dates, names, and figures EXACTLY as written
|
||||
- Use ONLY information from the provided section. Never add outside knowledge, never infer facts the text does not state
|
||||
- If the section is ambiguous or silent on something it seems like it should cover, say so explicitly
|
||||
|
||||
Your output will be combined with summaries of other sections, so focus on thorough extraction rather than narrative flow."""
|
||||
|
||||
user_prompt = f"""Extract key information from this SECTION of a larger document:
|
||||
@@ -531,6 +726,12 @@ Create a well-structured markdown summary that includes:
|
||||
2. Comprehensive summary of all other important information
|
||||
3. Proper markdown formatting with headers, bullets, and emphasis
|
||||
|
||||
GROUNDING RULES (critical — this summary will be used to answer questions with citations to this page):
|
||||
- Preserve citable facts as short VERBATIM quotes in "double quotes"; keep all numbers, dates, names, versions, and figures EXACTLY as the page states them
|
||||
- Use ONLY information present on the page. Never blend in outside knowledge, never fill gaps with plausible-sounding details
|
||||
- If the page does NOT cover something a reader would expect, note that explicitly (e.g. "The page does not mention pricing")
|
||||
- Attribute claims the page itself attributes (e.g. 'the author claims...', 'according to the cited study...') rather than stating them as bare fact
|
||||
|
||||
Your goal is to preserve ALL important information while reducing length. Never lose key facts, figures, insights, or actionable information. Make it scannable and well-organized."""
|
||||
|
||||
user_prompt = f"""Please process this web content and create a comprehensive markdown summary:
|
||||
@@ -552,6 +753,23 @@ Create a markdown summary that captures all key information in a well-organized,
|
||||
if aux_client is None or not effective_model:
|
||||
logger.warning("No auxiliary model available for web content processing")
|
||||
return None
|
||||
|
||||
# ── Live-display streaming fast path ──────────────────────
|
||||
# When a front-end (CLI) registered a summary display callback
|
||||
# and no other summarization stream owns the display slot, run
|
||||
# the call in streaming mode and mirror tokens to the UI.
|
||||
# Any failure falls through to the standard non-streaming path
|
||||
# with its full retry/fallback machinery.
|
||||
# Gated by web.summary_stream in config.yaml (default: on).
|
||||
if attempt == 0 and not is_chunk and _summary_stream_enabled():
|
||||
streamed = await _try_stream_summarizer(
|
||||
aux_client, effective_model, extra_body,
|
||||
system_prompt, user_prompt, max_tokens,
|
||||
context_str=context_str,
|
||||
)
|
||||
if streamed:
|
||||
return streamed
|
||||
|
||||
call_kwargs = {
|
||||
"task": "web_extract",
|
||||
"model": effective_model,
|
||||
@@ -1004,6 +1222,22 @@ def web_search_tool(query: str, limit: int = 5) -> str:
|
||||
)
|
||||
response_data = provider.search(query, limit)
|
||||
|
||||
# ── Grounding annotations ──────────────────────────────────────
|
||||
# Tag each result with its stable citation id and attach citation
|
||||
# guidance so the model grounds its answer Perplexity-style.
|
||||
# Gated by web.citations in config.yaml ("auto" | "always" | "off").
|
||||
try:
|
||||
guidance = _get_citation_guidance()
|
||||
if guidance is not None:
|
||||
web_results = response_data.get("data", {}).get("web", []) if isinstance(response_data, dict) else []
|
||||
for r in web_results:
|
||||
if isinstance(r, dict) and r.get("url"):
|
||||
r["source"] = f"[{get_source_id(r['url'])}]"
|
||||
if web_results and isinstance(response_data, dict):
|
||||
response_data["citation_guidance"] = guidance
|
||||
except Exception:
|
||||
logger.debug("Failed to annotate search results with source ids", exc_info=True)
|
||||
|
||||
debug_call_data["results_count"] = len(response_data.get("data", {}).get("web", []))
|
||||
result_json = json.dumps(response_data, indent=2, ensure_ascii=False)
|
||||
debug_call_data["final_response_size"] = len(result_json)
|
||||
@@ -1278,17 +1512,21 @@ async def web_extract_tool(
|
||||
logger.info("%s (%d characters)", url, content_length)
|
||||
|
||||
# Trim output to minimal fields per entry: title, content, error
|
||||
_extract_guidance = _get_citation_guidance()
|
||||
trimmed_results = [
|
||||
{
|
||||
"url": r.get("url", ""),
|
||||
"title": r.get("title", ""),
|
||||
**({"source": f"[{get_source_id(r['url'])}]"} if (_extract_guidance is not None and r.get("url")) else {}),
|
||||
"content": r.get("content", ""),
|
||||
"error": r.get("error"),
|
||||
**({ "blocked_by_policy": r["blocked_by_policy"]} if "blocked_by_policy" in r else {}),
|
||||
}
|
||||
for r in response.get("results", [])
|
||||
]
|
||||
trimmed_response = {"results": trimmed_results}
|
||||
trimmed_response: Dict[str, Any] = {"results": trimmed_results}
|
||||
if _extract_guidance is not None and any(r.get("content") for r in trimmed_results):
|
||||
trimmed_response["citation_guidance"] = _extract_guidance
|
||||
if _free_parallel_extract:
|
||||
# Credit Parallel's free Search MCP (drives the "[Parallel]" UI tag
|
||||
# + lets the model cite the source). Free tier only.
|
||||
@@ -1508,7 +1746,7 @@ from tools.registry import registry, tool_error
|
||||
|
||||
WEB_SEARCH_SCHEMA = {
|
||||
"name": "web_search",
|
||||
"description": "Search the web for information. Returns up to 5 results by default with titles, URLs, and descriptions. The query is passed through to the configured backend, so operators such as site:domain, filetype:pdf, intitle:word, -term, and \"exact phrase\" may work when the backend supports them.",
|
||||
"description": "Search the web for information. Returns up to 5 results by default with titles, URLs, and descriptions. Results may carry a stable citation id in a 'source' field (e.g. \"[3]\") with citation guidance in the response; follow that guidance when present. The query is passed through to the configured backend, so operators such as site:domain, filetype:pdf, intitle:word, -term, and \"exact phrase\" may work when the backend supports them.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@@ -1530,7 +1768,7 @@ WEB_SEARCH_SCHEMA = {
|
||||
|
||||
WEB_EXTRACT_SCHEMA = {
|
||||
"name": "web_extract",
|
||||
"description": "Extract content from web page URLs. Returns page content in markdown format. Also works with PDF URLs (arxiv papers, documents, etc.) — pass the PDF link directly and it converts to markdown text. Pages under 5000 chars return full markdown; larger pages are LLM-summarized and capped at ~5000 chars per page. Pages over 2M chars are refused. If a URL fails or times out, use the browser tool to access it instead.",
|
||||
"description": "Extract content from web page URLs. Returns page content in markdown format. Results may carry a stable citation id in a 'source' field (e.g. \"[3]\") with citation guidance in the response; follow that guidance when present. Also works with PDF URLs (arxiv papers, documents, etc.) — pass the PDF link directly and it converts to markdown text. Pages under 5000 chars return full markdown; larger pages are LLM-summarized and capped at ~5000 chars per page. Pages over 2M chars are refused. If a URL fails or times out, use the browser tool to access it instead.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
@@ -1592,6 +1592,22 @@ web:
|
||||
|
||||
**Parallel search modes:** Set `PARALLEL_SEARCH_MODE` to control search behavior — `fast`, `one-shot`, or `agentic` (default: `agentic`).
|
||||
|
||||
### Grounded citations
|
||||
|
||||
Web results carry stable numbered source ids (`[1]`, `[2]`, …) that the agent cites inline, Perplexity-style, with a `Sources:` list at the end of the answer. Control this with `web.citations`:
|
||||
|
||||
```yaml
|
||||
web:
|
||||
citations: auto # auto (default) | always | off
|
||||
summary_stream: true # live summarization box in the CLI for long pages
|
||||
```
|
||||
|
||||
- **`auto`** (default) — the agent cites inline only when the request is research/report-shaped (sourced summaries, comparisons, news, fact-finding). Incidental lookups (checking syntax mid-coding, casual questions) stay citation-free.
|
||||
- **`always`** — cite inline whenever the answer uses web results.
|
||||
- **`off`** — no source ids or citation guidance (pre-feature behavior).
|
||||
|
||||
`web.summary_stream` toggles the live "Summarizing" box shown in the CLI while `web_extract` condenses long pages (gateway and cron are unaffected).
|
||||
|
||||
**Exa:** Set `EXA_API_KEY` in `~/.hermes/.env`. Supports `category` filtering (`company`, `research paper`, `news`, `people`, `personal site`, `pdf`) and domain/date filters.
|
||||
|
||||
## Browser
|
||||
|
||||
Reference in New Issue
Block a user