Compare commits

...

2 Commits

Author SHA1 Message Date
teknium1
f6b0b09988 feat(web): make grounded citations + summary streaming toggleable via config.yaml
- web.citations: auto (default) | always | off
  * auto: new CITATION_GUIDANCE_AUTO instructs the model to cite inline
    only for research/report/fact-finding-shaped requests and answer
    naturally for incidental lookups (Perplexity's leaked prompts handle
    query-class exemptions the same way - instruction, not classifier)
  * always: unconditional guidance (previous behavior)
  * off: no source ids, no guidance - byte-identical to pre-feature output
- web.summary_stream: true (default) | false - gates the live CLI
  summarization box
- Tool schema descriptions made mode-agnostic ('results MAY carry a
  source field... follow guidance when present') so the static schema
  stays byte-stable for a conversation regardless of config
- Docs: configuration.md 'Grounded citations' section
- 7 new tests covering mode resolution + off-mode output shape
2026-06-12 12:25:20 -07:00
teknium1
853d32fb90 feat(web): Perplexity-style grounded citations + live summary streaming box
Grounding (web_search + web_extract):
- Process-wide source registry assigns stable numbered citation ids
  (url -> [n]) shared across search and extract calls, so the model only
  ever emits small integers it received - it cannot hallucinate a URL
  (the structural trick behind Perplexity's grounding)
- Every search/extract result carries its marker in a 'source' field;
  responses with content include a compact citation_guidance block
  modeled on Perplexity's leaked prompts (per-sentence inline [n] cites,
  max 3 per sentence, cite-while-writing, never invent ids, Sources list)
- Summarizer prompts hardened per ALCE (arXiv:2305.14627) and WebGPT
  (arXiv:2112.09332): preserve citable facts as verbatim quotes, keep
  figures exact, never blend outside knowledge, flag gaps explicitly

Live summary streaming (CLI):
- New tools/summary_display.py: tiny callback registry + single display
  slot; no-op everywhere a front-end doesn't register (gateway, cron,
  subagents, tests)
- web_extract summarization now tries a streaming call first when a
  display is attached, mirroring tokens into a dim 'Summarizing . <url>'
  box in the CLI (same UX as reasoning blocks); any streaming failure
  falls back to the existing non-streaming retry/fallback path
- Parallel page summaries race for the slot; only one streams, others
  run silently - no interleaved boxes
2026-06-12 03:24:42 -07:00
6 changed files with 789 additions and 3 deletions

56
cli.py
View File

@@ -3188,6 +3188,13 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
self.bell_on_complete = CLI_CONFIG["display"].get("bell_on_complete", False)
# show_reasoning: display model thinking/reasoning before the response
self.show_reasoning = CLI_CONFIG["display"].get("show_reasoning", False)
# Live web_extract summarization box (reasoning-style). Registered
# process-wide; no-op for runs that never summarize a web page.
try:
from tools.summary_display import set_summary_stream_callback
set_summary_stream_callback(self._on_summary_stream)
except Exception:
logger.debug("Failed to register summary stream callback", exc_info=True)
_configure_output_history(
enabled=CLI_CONFIG["display"].get("persistent_output", True),
max_lines=CLI_CONFIG["display"].get("persistent_output_max_lines", 200),
@@ -4663,6 +4670,55 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
self._deferred_content = ""
self._emit_stream_text(deferred)
# ── Web-extract summary streaming (reasoning-style box) ─────────────
def _on_summary_stream(self, event: str, **kwargs) -> None:
"""Render web_extract summarization tokens in a dim live box.
Mirrors the reasoning-box UX: opens a dim 'Summarizing' box when the
summarizer LLM starts streaming, streams its tokens line-by-line,
and closes the box with a char count when done. Registered with
tools.summary_display; called from the summarizer's event loop
thread, so output goes through _cprint (patch_stdout-safe).
"""
try:
if event == "start":
url = kwargs.get("url", "")
w = self._scrollback_box_width()
label = " Summarizing "
if url:
short = url if len(url) <= w - 20 else url[: w - 23] + "..."
label = f" Summarizing · {short} "
fill = w - 2 - len(label)
_cprint(f"\n{_DIM}┌─{label}{'' * max(fill - 1, 0)}{_RST}")
self._summary_box_opened = True
self._summary_buf = ""
elif event == "delta":
if not getattr(self, "_summary_box_opened", False):
return
self._summary_buf = getattr(self, "_summary_buf", "") + kwargs.get("text", "")
while "\n" in self._summary_buf:
line, self._summary_buf = self._summary_buf.split("\n", 1)
_cprint(f"{_DIM}{line}{_RST}")
if len(self._summary_buf) > 80:
_cprint(f"{_DIM}{self._summary_buf}{_RST}")
self._summary_buf = ""
elif event == "end":
if not getattr(self, "_summary_box_opened", False):
return
buf = getattr(self, "_summary_buf", "")
if buf:
_cprint(f"{_DIM}{buf}{_RST}")
self._summary_buf = ""
w = self._scrollback_box_width()
chars = kwargs.get("char_count", 0)
tail = f" {chars:,} chars " if kwargs.get("ok") else " summarization fell back "
fill = w - 2 - len(tail)
_cprint(f"{_DIM}{'' * max(fill - 1, 0)}{tail}{_RST}")
self._summary_box_opened = False
except Exception:
logger.debug("Summary stream render failed", exc_info=True)
def _stream_delta(self, text) -> None:
"""Line-buffered streaming callback for real-time token rendering.

View File

@@ -1017,6 +1017,15 @@ DEFAULT_CONFIG = {
"backend": "", # shared fallback — applies to both search and extract
"search_backend": "", # per-capability override for web_search (e.g. "searxng")
"extract_backend": "", # per-capability override for web_extract (e.g. "native")
# Grounded citations on web results:
# "auto" — results carry stable source ids; the model is instructed to
# cite inline only for research/report-style requests
# "always" — model is instructed to cite inline whenever it uses results
# "off" — no source ids, no citation guidance (pre-feature behavior)
"citations": "auto",
# Stream web_extract page summarization live into a reasoning-style
# box in the CLI (no effect on gateway/cron).
"summary_stream": True,
},
"browser": {

View File

@@ -0,0 +1,382 @@
"""Tests for web grounding citations + live summary display streaming.
Covers:
- tools/summary_display.py callback registry + single-slot semantics
- source registry stable citation IDs
- web_search_tool result annotation (source ids + citation_guidance)
- web_extract_tool trimmed-output annotation
- _try_stream_summarizer streaming fast path + fallback semantics
"""
import asyncio
import json
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from tools import summary_display
from tools.web_tools import (
CITATION_GUIDANCE,
CITATION_GUIDANCE_AUTO,
_get_citation_guidance,
_get_citations_mode,
_summary_stream_enabled,
_try_stream_summarizer,
get_source_id,
reset_source_registry,
)
@pytest.fixture(autouse=True)
def _clean_state():
reset_source_registry()
summary_display.set_summary_stream_callback(None)
summary_display._slot_holder = None
yield
reset_source_registry()
summary_display.set_summary_stream_callback(None)
summary_display._slot_holder = None
# ---------------------------------------------------------------------------
# Source registry
# ---------------------------------------------------------------------------
class TestSourceRegistry:
def test_ids_are_stable_and_sequential(self):
a = get_source_id("https://example.com/a")
b = get_source_id("https://example.com/b")
assert (a, b) == (1, 2)
# Same URL → same id
assert get_source_id("https://example.com/a") == a
def test_normalization_fragment_and_trailing_slash(self):
a = get_source_id("https://example.com/page")
assert get_source_id("https://example.com/page/") == a
assert get_source_id("https://example.com/page#section") == a
def test_reset_clears_ids(self):
get_source_id("https://example.com/x")
reset_source_registry()
assert get_source_id("https://example.com/y") == 1
# ---------------------------------------------------------------------------
# Summary display registry
# ---------------------------------------------------------------------------
class TestSummaryDisplay:
def test_emit_noop_without_callback(self):
# Must not raise
summary_display.emit("delta", text="hi")
def test_emit_invokes_callback(self):
events = []
summary_display.set_summary_stream_callback(
lambda event, **kw: events.append((event, kw))
)
summary_display.emit("start", url="https://x.com", title="X")
summary_display.emit("delta", text="token")
assert events == [
("start", {"url": "https://x.com", "title": "X"}),
("delta", {"text": "token"}),
]
def test_callback_exception_swallowed(self):
def boom(event, **kw):
raise RuntimeError("nope")
summary_display.set_summary_stream_callback(boom)
summary_display.emit("start", url="u", title="t") # must not raise
def test_slot_requires_callback(self):
token = object()
assert summary_display.try_acquire_stream_slot(token) is False
def test_slot_single_holder(self):
summary_display.set_summary_stream_callback(lambda e, **kw: None)
t1, t2 = object(), object()
assert summary_display.try_acquire_stream_slot(t1) is True
assert summary_display.try_acquire_stream_slot(t2) is False
summary_display.release_stream_slot(t1)
assert summary_display.try_acquire_stream_slot(t2) is True
summary_display.release_stream_slot(t2)
def test_release_wrong_token_keeps_slot(self):
summary_display.set_summary_stream_callback(lambda e, **kw: None)
t1, t2 = object(), object()
assert summary_display.try_acquire_stream_slot(t1)
summary_display.release_stream_slot(t2) # not the holder — no-op
assert summary_display.try_acquire_stream_slot(t2) is False
summary_display.release_stream_slot(t1)
# ---------------------------------------------------------------------------
# web_search_tool annotation
# ---------------------------------------------------------------------------
class TestSearchAnnotation:
def _provider(self, results):
provider = MagicMock()
provider.name = "fake"
provider.supports_search.return_value = True
provider.search.return_value = {"success": True, "data": {"web": results}}
return provider
def test_results_get_source_ids_and_guidance(self):
from tools import web_tools
provider = self._provider([
{"title": "A", "url": "https://a.com", "description": "aaa", "position": 1},
{"title": "B", "url": "https://b.com", "description": "bbb", "position": 2},
])
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
patch("agent.web_search_registry.get_provider", return_value=provider), \
patch.object(web_tools, "_get_search_backend", return_value="fake"):
out = json.loads(web_tools.web_search_tool("query"))
web = out["data"]["web"]
assert web[0]["source"] == "[1]"
assert web[1]["source"] == "[2]"
assert out["citation_guidance"] == CITATION_GUIDANCE_AUTO
def test_same_url_keeps_id_across_calls(self):
from tools import web_tools
provider = self._provider([
{"title": "A", "url": "https://a.com", "description": "aaa", "position": 1},
])
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
patch("agent.web_search_registry.get_provider", return_value=provider), \
patch.object(web_tools, "_get_search_backend", return_value="fake"):
first = json.loads(web_tools.web_search_tool("query"))
second = json.loads(web_tools.web_search_tool("query again"))
assert first["data"]["web"][0]["source"] == second["data"]["web"][0]["source"]
def test_empty_results_no_guidance(self):
from tools import web_tools
provider = self._provider([])
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
patch("agent.web_search_registry.get_provider", return_value=provider), \
patch.object(web_tools, "_get_search_backend", return_value="fake"):
out = json.loads(web_tools.web_search_tool("query"))
assert "citation_guidance" not in out
# ---------------------------------------------------------------------------
# web_extract_tool annotation
# ---------------------------------------------------------------------------
class TestExtractAnnotation:
def test_trimmed_results_carry_source_and_guidance(self):
from tools import web_tools
provider = MagicMock()
provider.name = "fake"
provider.supports_extract.return_value = True
provider.extract = MagicMock(return_value=[
{"url": "https://a.com", "title": "A", "content": "short content"},
])
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
patch("agent.web_search_registry.get_provider", return_value=provider), \
patch.object(web_tools, "_get_extract_backend", return_value="fake"), \
patch.object(web_tools, "check_auxiliary_model", return_value=False), \
patch.object(web_tools, "async_is_safe_url", new=_async_true):
out = json.loads(asyncio.run(
web_tools.web_extract_tool(["https://a.com"], use_llm_processing=False)
))
assert out["results"][0]["source"] == "[1]"
assert out["citation_guidance"] == CITATION_GUIDANCE_AUTO
def test_search_then_extract_share_ids(self):
from tools import web_tools
search_provider = MagicMock()
search_provider.name = "fake"
search_provider.supports_search.return_value = True
search_provider.search.return_value = {
"success": True,
"data": {"web": [{"title": "A", "url": "https://a.com", "description": "d", "position": 1}]},
}
extract_provider = MagicMock()
extract_provider.name = "fake"
extract_provider.supports_extract.return_value = True
extract_provider.extract = MagicMock(return_value=[
{"url": "https://a.com", "title": "A", "content": "body"},
])
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
patch("agent.web_search_registry.get_provider", return_value=search_provider), \
patch.object(web_tools, "_get_search_backend", return_value="fake"):
search_out = json.loads(web_tools.web_search_tool("q"))
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
patch("agent.web_search_registry.get_provider", return_value=extract_provider), \
patch.object(web_tools, "_get_extract_backend", return_value="fake"), \
patch.object(web_tools, "check_auxiliary_model", return_value=False), \
patch.object(web_tools, "async_is_safe_url", new=_async_true):
extract_out = json.loads(asyncio.run(
web_tools.web_extract_tool(["https://a.com"], use_llm_processing=False)
))
assert search_out["data"]["web"][0]["source"] == extract_out["results"][0]["source"] == "[1]"
async def _async_true(url):
return True
# ---------------------------------------------------------------------------
# Config toggles (web.citations / web.summary_stream)
# ---------------------------------------------------------------------------
class TestCitationModes:
def test_default_mode_is_auto(self):
with patch("tools.web_tools._load_web_config", return_value={}):
assert _get_citations_mode() == "auto"
assert _get_citation_guidance() == CITATION_GUIDANCE_AUTO
def test_always_mode(self):
with patch("tools.web_tools._load_web_config", return_value={"citations": "always"}):
assert _get_citation_guidance() == CITATION_GUIDANCE
def test_off_mode_returns_none(self):
with patch("tools.web_tools._load_web_config", return_value={"citations": "off"}):
assert _get_citation_guidance() is None
def test_invalid_mode_falls_back_to_auto(self):
with patch("tools.web_tools._load_web_config", return_value={"citations": "bogus"}):
assert _get_citations_mode() == "auto"
def test_off_mode_strips_annotations_from_search(self):
from tools import web_tools
provider = MagicMock()
provider.name = "fake"
provider.supports_search.return_value = True
provider.search.return_value = {
"success": True,
"data": {"web": [{"title": "A", "url": "https://a.com", "description": "d", "position": 1}]},
}
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
patch("agent.web_search_registry.get_provider", return_value=provider), \
patch.object(web_tools, "_get_search_backend", return_value="fake"), \
patch.object(web_tools, "_load_web_config", return_value={"citations": "off"}):
out = json.loads(web_tools.web_search_tool("query"))
assert "citation_guidance" not in out
assert "source" not in out["data"]["web"][0]
def test_off_mode_strips_annotations_from_extract(self):
from tools import web_tools
provider = MagicMock()
provider.name = "fake"
provider.supports_extract.return_value = True
provider.extract = MagicMock(return_value=[
{"url": "https://a.com", "title": "A", "content": "body"},
])
with patch.object(web_tools, "_ensure_web_plugins_loaded"), \
patch("agent.web_search_registry.get_provider", return_value=provider), \
patch.object(web_tools, "_get_extract_backend", return_value="fake"), \
patch.object(web_tools, "check_auxiliary_model", return_value=False), \
patch.object(web_tools, "async_is_safe_url", new=_async_true), \
patch.object(web_tools, "_load_web_config", return_value={"citations": "off"}):
out = json.loads(asyncio.run(
web_tools.web_extract_tool(["https://a.com"], use_llm_processing=False)
))
assert "citation_guidance" not in out
assert "source" not in out["results"][0]
def test_summary_stream_toggle(self):
with patch("tools.web_tools._load_web_config", return_value={}):
assert _summary_stream_enabled() is True
with patch("tools.web_tools._load_web_config", return_value={"summary_stream": False}):
assert _summary_stream_enabled() is False
# ---------------------------------------------------------------------------
# Streaming summarizer fast path
# ---------------------------------------------------------------------------
def _make_stream_chunks(texts):
async def _gen():
for t in texts:
yield SimpleNamespace(
choices=[SimpleNamespace(delta=SimpleNamespace(content=t))]
)
return _gen()
class _FakeAsyncClient:
"""Minimal async client whose chat.completions.create returns a stream."""
def __init__(self, chunks=None, exc=None):
self._chunks = chunks
self._exc = exc
self.kwargs = None
outer = self
class _Completions:
async def create(self, **kwargs):
outer.kwargs = kwargs
if outer._exc:
raise outer._exc
return _make_stream_chunks(outer._chunks)
self.chat = SimpleNamespace(completions=_Completions())
class TestStreamSummarizer:
def test_no_callback_returns_none_without_calling(self):
client = _FakeAsyncClient(chunks=["x"])
result = asyncio.run(_try_stream_summarizer(
client, "model", {}, "sys", "user", 1000))
assert result is None
assert client.kwargs is None # never reached the API
def test_streams_and_returns_summary(self):
events = []
summary_display.set_summary_stream_callback(
lambda event, **kw: events.append((event, kw)))
client = _FakeAsyncClient(chunks=["Hello ", "world"])
result = asyncio.run(_try_stream_summarizer(
client, "model", {}, "sys", "user", 1000,
context_str="Title: T\nSource: https://a.com\n\n"))
assert result == "Hello world"
assert client.kwargs["stream"] is True
names = [e[0] for e in events]
assert names == ["start", "delta", "delta", "end"]
assert events[0][1] == {"url": "https://a.com", "title": "T"}
assert events[-1][1]["ok"] is True
assert events[-1][1]["char_count"] == len("Hello world")
# Slot released
assert summary_display.try_acquire_stream_slot(object()) is True
def test_provider_error_falls_back(self):
events = []
summary_display.set_summary_stream_callback(
lambda event, **kw: events.append((event, kw)))
client = _FakeAsyncClient(exc=RuntimeError("stream unsupported"))
result = asyncio.run(_try_stream_summarizer(
client, "model", {}, "sys", "user", 1000))
assert result is None
# Error before "start" → no end event needed
assert all(e[0] != "end" or e[1]["ok"] is False for e in events)
# Slot released even on failure
assert summary_display.try_acquire_stream_slot(object()) is True
def test_empty_stream_falls_back(self):
summary_display.set_summary_stream_callback(lambda e, **kw: None)
client = _FakeAsyncClient(chunks=[])
result = asyncio.run(_try_stream_summarizer(
client, "model", {}, "sys", "user", 1000))
assert result is None
def test_slot_busy_returns_none(self):
summary_display.set_summary_stream_callback(lambda e, **kw: None)
holder = object()
assert summary_display.try_acquire_stream_slot(holder)
try:
client = _FakeAsyncClient(chunks=["x"])
result = asyncio.run(_try_stream_summarizer(
client, "model", {}, "sys", "user", 1000))
assert result is None
assert client.kwargs is None
finally:
summary_display.release_stream_slot(holder)

85
tools/summary_display.py Normal file
View File

@@ -0,0 +1,85 @@
"""Live display hook for web-extract summarization streams.
When ``web_extract`` summarizes a long page, the summarizer LLM call can take
many seconds with no visible activity. Front-ends (currently the interactive
CLI) can register a callback here to mirror the summary tokens into a live
box — the same UX as streaming reasoning blocks.
Design constraints:
- Zero coupling: ``tools/web_tools.py`` only calls module functions here; if
no front-end registered a callback (gateway, cron, subagents, tests) every
call is a cheap no-op and the summarizer runs exactly as before.
- One stream at a time: ``web_extract`` summarizes multiple pages in
parallel. Only the first task to acquire the display slot streams to the
terminal; the others run silently. This avoids interleaved boxes.
Callback protocol::
callback(event: str, **kwargs)
event == "start": kwargs = {"url": str, "title": str}
event == "delta": kwargs = {"text": str}
event == "end": kwargs = {"char_count": int, "ok": bool}
The callback must never raise — but we guard anyway.
"""
from __future__ import annotations
import logging
import threading
from typing import Callable, Optional
logger = logging.getLogger(__name__)
_callback: Optional[Callable] = None
_slot_lock = threading.Lock()
_slot_holder: Optional[object] = None
def set_summary_stream_callback(callback: Optional[Callable]) -> None:
"""Register (or clear, with None) the live summary display callback."""
global _callback
_callback = callback
def get_summary_stream_callback() -> Optional[Callable]:
"""Return the registered callback, or None."""
return _callback
def try_acquire_stream_slot(token: object) -> bool:
"""Try to claim the single live-display slot for ``token``.
Returns True when the caller may stream to the display. Callers MUST
call :func:`release_stream_slot` with the same token when done.
Returns False immediately (non-blocking) when another stream owns the
slot or no callback is registered.
"""
global _slot_holder
if _callback is None:
return False
with _slot_lock:
if _slot_holder is None:
_slot_holder = token
return True
return False
def release_stream_slot(token: object) -> None:
"""Release the live-display slot if ``token`` owns it."""
global _slot_holder
with _slot_lock:
if _slot_holder is token:
_slot_holder = None
def emit(event: str, **kwargs) -> None:
"""Invoke the registered callback, swallowing any error."""
cb = _callback
if cb is None:
return
try:
cb(event, **kwargs)
except Exception:
logger.debug("summary display callback failed for event %s", event, exc_info=True)

View File

@@ -41,6 +41,7 @@ import logging
import os
import re
import asyncio
import threading
from typing import List, Dict, Any, Optional, TYPE_CHECKING
import httpx # noqa: F401 — kept at module top so tests can patch tools.web_tools.httpx
# After the web-provider plugin migration (PR #25182), the Firecrawl SDK
@@ -346,6 +347,105 @@ def _web_requires_env() -> list[str]:
DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION = 5000
# ---------------------------------------------------------------------------
# Source registry — stable numbered citation IDs for grounding
# ---------------------------------------------------------------------------
# Perplexity-style grounding works because the model only ever emits small
# stable integers ("[3]") that were assigned at retrieval time — it cannot
# hallucinate a URL. We keep a process-wide URL → ID map so the same page
# keeps the same citation number across web_search and web_extract calls
# within a session, and annotate every result with its marker.
#
# Research basis: ALCE (arXiv:2305.14627) shows citing-during-generation from
# numbered snippets beats post-hoc attribution; WebGPT (arXiv:2112.09332)
# grounds answers in verbatim quotes collected at browse time. The summarizer
# prompts below preserve verbatim quotes for that reason.
_source_registry: Dict[str, int] = {}
_source_registry_lock = threading.Lock()
def _normalize_source_url(url: str) -> str:
"""Canonicalize a URL for registry identity (strip fragment, trailing /)."""
u = (url or "").strip()
if "#" in u:
u = u.split("#", 1)[0]
return u.rstrip("/") or u
def get_source_id(url: str) -> int:
"""Return the stable citation ID for ``url``, assigning one if new."""
key = _normalize_source_url(url)
with _source_registry_lock:
sid = _source_registry.get(key)
if sid is None:
sid = len(_source_registry) + 1
_source_registry[key] = sid
return sid
def reset_source_registry() -> None:
"""Clear the URL → citation-ID map (tests / new sessions)."""
with _source_registry_lock:
_source_registry.clear()
CITATION_GUIDANCE = (
"GROUNDING: Each result has a stable source id like [3]. When you answer "
"the user using facts from these sources, cite by placing the bracketed "
"id(s) immediately after each sentence they support, e.g. 'Ice is less "
"dense than water.[1][2]' — no space before the bracket, at most 3 ids "
"per sentence. Cite while writing, per supported sentence (not one "
"citation dumped at the end). Only cite ids that appear in tool results "
"you actually received; never invent an id or cite a source you did not "
"read. Claims from your own knowledge get no citation. If sources "
"conflict, present both with their ids. End the answer with a 'Sources:' "
"list mapping each cited id to its URL (e.g. '[1] https://...') — list "
"only ids you cited."
)
# "auto" mode prefix: the model decides per-request whether the answer is
# research/report-shaped. Mirrors Perplexity's leaked-prompt approach of
# exempting query classes (translation, creative writing) via instruction
# rather than a separate classifier.
CITATION_GUIDANCE_AUTO = (
"GROUNDING (conditional): Each result has a stable source id like [3]. "
"IF the user's request is research-, report-, or fact-finding-shaped — "
"they asked for a sourced/grounded answer, a summary of information from "
"the web, a comparison, news, or anything where claim provenance matters "
"— then cite: place the bracketed id(s) immediately after each sentence "
"they support, e.g. 'Ice is less dense than water.[1][2]' — no space "
"before the bracket, at most 3 ids per sentence, cite while writing (not "
"one citation dumped at the end), and end with a 'Sources:' list mapping "
"each cited id to its URL — list only ids you cited. Only cite ids that "
"appear in tool results you actually received; never invent an id. "
"Claims from your own knowledge get no citation. IF instead the search "
"is incidental to a different task (quick lookup mid-coding, checking "
"syntax/versions, casual conversation, creative writing), skip inline "
"citations and just answer naturally — mention a URL only if the user "
"would plausibly want the link."
)
def _get_citations_mode() -> str:
"""Return the web citations mode: 'auto' (default), 'always', or 'off'."""
mode = str(_load_web_config().get("citations", "auto") or "auto").strip().lower()
return mode if mode in {"auto", "always", "off"} else "auto"
def _get_citation_guidance() -> Optional[str]:
"""Return the guidance text for the active mode, or None when off."""
mode = _get_citations_mode()
if mode == "off":
return None
return CITATION_GUIDANCE if mode == "always" else CITATION_GUIDANCE_AUTO
def _summary_stream_enabled() -> bool:
"""Whether live summary streaming to a registered display is enabled."""
return bool(_load_web_config().get("summary_stream", True))
def _is_nous_auxiliary_client(client: Any) -> bool:
"""Return True when the resolved auxiliary backend is Nous Portal."""
from urllib.parse import urlparse
@@ -478,6 +578,96 @@ async def process_content_with_llm(
return truncated
async def _try_stream_summarizer(
aux_client: Any,
model: str,
extra_body: Dict[str, Any],
system_prompt: str,
user_prompt: str,
max_tokens: int,
context_str: str = "",
) -> Optional[str]:
"""Attempt a streaming summarizer call mirrored to the live display.
Returns the full summary text on success, or ``None`` to signal the
caller to fall back to the standard non-streaming path (no callback
registered, display slot busy, provider doesn't support streaming, or
any error mid-stream). Never raises.
"""
from tools.summary_display import (
emit,
release_stream_slot,
try_acquire_stream_slot,
)
token = object()
if not try_acquire_stream_slot(token):
return None
started = False
try:
# Parse "Title: ..." / "Source: ..." lines out of context_str for
# the display header.
url = title = ""
for line in (context_str or "").splitlines():
if line.startswith("Source: "):
url = line[len("Source: "):].strip()
elif line.startswith("Title: "):
title = line[len("Title: "):].strip()
from agent.auxiliary_client import _get_task_timeout
timeout = _get_task_timeout("web_extract")
kwargs: Dict[str, Any] = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": 0.1,
"max_tokens": max_tokens,
"stream": True,
}
if timeout:
kwargs["timeout"] = timeout
if extra_body:
kwargs["extra_body"] = extra_body
stream = await aux_client.chat.completions.create(**kwargs)
emit("start", url=url, title=title)
started = True
parts: List[str] = []
async for chunk in stream:
try:
delta = chunk.choices[0].delta
except (AttributeError, IndexError):
continue
text = getattr(delta, "content", None)
if text:
parts.append(text)
emit("delta", text=text)
summary = "".join(parts).strip()
emit("end", char_count=len(summary), ok=bool(summary))
started = False
if summary:
return summary
# Empty stream (reasoning-only model, provider quirk) — fall back.
logger.debug("Streaming summarizer returned empty content; falling back")
return None
except Exception as e:
# Streaming is best-effort: adapters without stream support,
# transport errors, etc. all fall back to the robust path.
logger.debug("Streaming summarizer failed (%s); falling back to non-streaming", str(e)[:120])
if started:
emit("end", char_count=0, ok=False)
return None
finally:
release_stream_slot(token)
async def _call_summarizer_llm(
content: str,
context_str: str,
@@ -511,6 +701,11 @@ Important guidelines for chunk processing:
4. Use bullet points and structured formatting for easy synthesis later
5. Note any references to other sections (e.g., "as mentioned earlier", "see below") without trying to resolve them
GROUNDING RULES (critical — this summary will be used to answer questions with citations to this source):
- Keep citable facts as short VERBATIM quotes in "double quotes"; keep numbers, dates, names, and figures EXACTLY as written
- Use ONLY information from the provided section. Never add outside knowledge, never infer facts the text does not state
- If the section is ambiguous or silent on something it seems like it should cover, say so explicitly
Your output will be combined with summaries of other sections, so focus on thorough extraction rather than narrative flow."""
user_prompt = f"""Extract key information from this SECTION of a larger document:
@@ -531,6 +726,12 @@ Create a well-structured markdown summary that includes:
2. Comprehensive summary of all other important information
3. Proper markdown formatting with headers, bullets, and emphasis
GROUNDING RULES (critical — this summary will be used to answer questions with citations to this page):
- Preserve citable facts as short VERBATIM quotes in "double quotes"; keep all numbers, dates, names, versions, and figures EXACTLY as the page states them
- Use ONLY information present on the page. Never blend in outside knowledge, never fill gaps with plausible-sounding details
- If the page does NOT cover something a reader would expect, note that explicitly (e.g. "The page does not mention pricing")
- Attribute claims the page itself attributes (e.g. 'the author claims...', 'according to the cited study...') rather than stating them as bare fact
Your goal is to preserve ALL important information while reducing length. Never lose key facts, figures, insights, or actionable information. Make it scannable and well-organized."""
user_prompt = f"""Please process this web content and create a comprehensive markdown summary:
@@ -552,6 +753,23 @@ Create a markdown summary that captures all key information in a well-organized,
if aux_client is None or not effective_model:
logger.warning("No auxiliary model available for web content processing")
return None
# ── Live-display streaming fast path ──────────────────────
# When a front-end (CLI) registered a summary display callback
# and no other summarization stream owns the display slot, run
# the call in streaming mode and mirror tokens to the UI.
# Any failure falls through to the standard non-streaming path
# with its full retry/fallback machinery.
# Gated by web.summary_stream in config.yaml (default: on).
if attempt == 0 and not is_chunk and _summary_stream_enabled():
streamed = await _try_stream_summarizer(
aux_client, effective_model, extra_body,
system_prompt, user_prompt, max_tokens,
context_str=context_str,
)
if streamed:
return streamed
call_kwargs = {
"task": "web_extract",
"model": effective_model,
@@ -1004,6 +1222,22 @@ def web_search_tool(query: str, limit: int = 5) -> str:
)
response_data = provider.search(query, limit)
# ── Grounding annotations ──────────────────────────────────────
# Tag each result with its stable citation id and attach citation
# guidance so the model grounds its answer Perplexity-style.
# Gated by web.citations in config.yaml ("auto" | "always" | "off").
try:
guidance = _get_citation_guidance()
if guidance is not None:
web_results = response_data.get("data", {}).get("web", []) if isinstance(response_data, dict) else []
for r in web_results:
if isinstance(r, dict) and r.get("url"):
r["source"] = f"[{get_source_id(r['url'])}]"
if web_results and isinstance(response_data, dict):
response_data["citation_guidance"] = guidance
except Exception:
logger.debug("Failed to annotate search results with source ids", exc_info=True)
debug_call_data["results_count"] = len(response_data.get("data", {}).get("web", []))
result_json = json.dumps(response_data, indent=2, ensure_ascii=False)
debug_call_data["final_response_size"] = len(result_json)
@@ -1278,17 +1512,21 @@ async def web_extract_tool(
logger.info("%s (%d characters)", url, content_length)
# Trim output to minimal fields per entry: title, content, error
_extract_guidance = _get_citation_guidance()
trimmed_results = [
{
"url": r.get("url", ""),
"title": r.get("title", ""),
**({"source": f"[{get_source_id(r['url'])}]"} if (_extract_guidance is not None and r.get("url")) else {}),
"content": r.get("content", ""),
"error": r.get("error"),
**({ "blocked_by_policy": r["blocked_by_policy"]} if "blocked_by_policy" in r else {}),
}
for r in response.get("results", [])
]
trimmed_response = {"results": trimmed_results}
trimmed_response: Dict[str, Any] = {"results": trimmed_results}
if _extract_guidance is not None and any(r.get("content") for r in trimmed_results):
trimmed_response["citation_guidance"] = _extract_guidance
if _free_parallel_extract:
# Credit Parallel's free Search MCP (drives the "[Parallel]" UI tag
# + lets the model cite the source). Free tier only.
@@ -1508,7 +1746,7 @@ from tools.registry import registry, tool_error
WEB_SEARCH_SCHEMA = {
"name": "web_search",
"description": "Search the web for information. Returns up to 5 results by default with titles, URLs, and descriptions. The query is passed through to the configured backend, so operators such as site:domain, filetype:pdf, intitle:word, -term, and \"exact phrase\" may work when the backend supports them.",
"description": "Search the web for information. Returns up to 5 results by default with titles, URLs, and descriptions. Results may carry a stable citation id in a 'source' field (e.g. \"[3]\") with citation guidance in the response; follow that guidance when present. The query is passed through to the configured backend, so operators such as site:domain, filetype:pdf, intitle:word, -term, and \"exact phrase\" may work when the backend supports them.",
"parameters": {
"type": "object",
"properties": {
@@ -1530,7 +1768,7 @@ WEB_SEARCH_SCHEMA = {
WEB_EXTRACT_SCHEMA = {
"name": "web_extract",
"description": "Extract content from web page URLs. Returns page content in markdown format. Also works with PDF URLs (arxiv papers, documents, etc.) — pass the PDF link directly and it converts to markdown text. Pages under 5000 chars return full markdown; larger pages are LLM-summarized and capped at ~5000 chars per page. Pages over 2M chars are refused. If a URL fails or times out, use the browser tool to access it instead.",
"description": "Extract content from web page URLs. Returns page content in markdown format. Results may carry a stable citation id in a 'source' field (e.g. \"[3]\") with citation guidance in the response; follow that guidance when present. Also works with PDF URLs (arxiv papers, documents, etc.) — pass the PDF link directly and it converts to markdown text. Pages under 5000 chars return full markdown; larger pages are LLM-summarized and capped at ~5000 chars per page. Pages over 2M chars are refused. If a URL fails or times out, use the browser tool to access it instead.",
"parameters": {
"type": "object",
"properties": {

View File

@@ -1592,6 +1592,22 @@ web:
**Parallel search modes:** Set `PARALLEL_SEARCH_MODE` to control search behavior — `fast`, `one-shot`, or `agentic` (default: `agentic`).
### Grounded citations
Web results carry stable numbered source ids (`[1]`, `[2]`, …) that the agent cites inline, Perplexity-style, with a `Sources:` list at the end of the answer. Control this with `web.citations`:
```yaml
web:
citations: auto # auto (default) | always | off
summary_stream: true # live summarization box in the CLI for long pages
```
- **`auto`** (default) — the agent cites inline only when the request is research/report-shaped (sourced summaries, comparisons, news, fact-finding). Incidental lookups (checking syntax mid-coding, casual questions) stay citation-free.
- **`always`** — cite inline whenever the answer uses web results.
- **`off`** — no source ids or citation guidance (pre-feature behavior).
`web.summary_stream` toggles the live "Summarizing" box shown in the CLI while `web_extract` condenses long pages (gateway and cron are unaffected).
**Exa:** Set `EXA_API_KEY` in `~/.hermes/.env`. Supports `category` filtering (`company`, `research paper`, `news`, `people`, `personal site`, `pdf`) and domain/date filters.
## Browser