Compare commits

...

1 Commits

Author SHA1 Message Date
teknium1
d3acfeda03 feat(watchers): interval-polling watcher engine with watermark dedup
Adds 'hermes watch' — the pull-based sibling of webhooks. Watchers poll
an external source on an interval, dedup new items via a per-watcher
watermark, and either deliver the raw items verbatim or hand them to a
short-lived agent.

Inspired by Vellum Assistant's watcher system.

## New surface

- `watchers/` package: engine + store + providers registry
- 3 built-in providers: `http_json` (any JSON endpoint), `rss` (RSS 2.0 + Atom),
  `github` (issues/pulls/releases/commits or search)
- `hermes watch {add,list,remove,run,reset,tick}` CLI
- Custom providers: `from watchers.providers import register`

## Integration

- Piggybacks on cron's tick loop — if cron runs, watchers run. Own file
  lock prevents double-fire across gateway/daemon/manual ticks.
- Delivery reuses cron's `_deliver_result` so `multi`/`all`/platform targets
  all work out of the box.
- First poll of a new watcher **records a baseline** — it never replays
  the existing feed.  Only items that appear after baseline are delivered.
- Watermark is a bounded ID set (max_seen, default 500) to cap memory.
  Stored per-watcher at `~/.hermes/watchers/<name>.watermark.json`.

## Changes
- watchers/{__init__,store,providers,engine}.py: new
- hermes_cli/watchers.py: new CLI command
- hermes_cli/main.py: argparse wiring + cmd_watch entrypoint
- cron/scheduler.py: best-effort watcher tick after cron tick
- tests/watchers/{test_store,test_providers,test_engine}.py: 46 tests
- website/docs/user-guide/features/watchers.md + sidebar entry

## Validation
```
scripts/run_tests.sh tests/watchers/                   # 46 passed
scripts/run_tests.sh tests/cron/test_scheduler.py      # 115 passed (piggyback hook safe)
HERMES_HOME=/tmp/... python -m hermes_cli.main watch --help   # CLI parses
HERMES_HOME=/tmp/... python -m hermes_cli.main watch list     # empty list prints help text
```
2026-05-07 13:15:45 -07:00
13 changed files with 1897 additions and 1 deletions

View File

@@ -1724,6 +1724,20 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
except Exception as _e:
logger.debug("Post-tick MCP orphan cleanup failed: %s", _e)
# Piggyback watcher tick — runs after cron jobs so cron stays the
# priority path. Watchers have their own file lock and are safe to
# invoke from any tick. Best-effort: if the watcher subsystem is
# missing (e.g. mid-migration), cron keeps working.
try:
from watchers.engine import tick as _watcher_tick
_watcher_outcomes = _watcher_tick(adapters=adapters, loop=loop, verbose=verbose)
if verbose and _watcher_outcomes:
logger.info("%d watcher(s) polled", len(_watcher_outcomes))
except ImportError:
pass
except Exception as _wexc:
logger.warning("Watcher tick failed (non-fatal): %s", _wexc)
return sum(_results)
finally:
if fcntl:

View File

@@ -5239,12 +5239,19 @@ def cmd_cron(args):
def cmd_webhook(args):
"""Webhook subscription management."""
"""Entry point for 'hermes webhook' command."""
from hermes_cli.webhook import webhook_command
webhook_command(args)
def cmd_watch(args):
"""Entry point for 'hermes watch' command."""
from hermes_cli.watchers import watch_command
watch_command(args)
def cmd_slack(args):
"""Slack integration helpers.
@@ -8070,6 +8077,7 @@ def _coalesce_session_name_args(argv: list) -> list:
"plugins",
"acp",
"webhook",
"watch",
"memory",
"dump",
"debug",
@@ -9265,6 +9273,83 @@ def main():
webhook_parser.set_defaults(func=cmd_webhook)
# =========================================================================
# watch command — interval-polling watchers (pull-based sibling of webhook)
# =========================================================================
watch_parser = subparsers.add_parser(
"watch",
help="Manage interval-polling watchers (HTTP JSON, RSS, GitHub, ...)",
description=(
"Watchers poll an external source on an interval, detect new items via "
"watermark-based dedup, and deliver the result (verbatim in --deliver-only "
"mode, or as prompt context to a short-lived agent otherwise)."
),
)
watch_subparsers = watch_parser.add_subparsers(dest="watch_action")
w_add = watch_subparsers.add_parser(
"add", aliases=["subscribe"], help="Create or update a watcher"
)
w_add.add_argument("name", help="Watcher name (lowercase alphanumerics + '-'/'_')")
w_add.add_argument(
"--provider",
required=True,
help="Provider name: http_json | rss | github | <custom>",
)
w_add.add_argument(
"--interval", type=int, default=300, help="Poll interval in seconds (default: 300)"
)
w_add.add_argument("--prompt", default="", help="Prompt template (supports {items_json}, {name}, {count})")
w_add.add_argument("--skills", default="", help="Comma-separated skill names to load")
w_add.add_argument(
"--deliver",
default="origin",
help="Delivery target: origin | local | multi | all | telegram | telegram:-100:17 | ... (see 'hermes cron' docs)",
)
w_add.add_argument(
"--deliver-only",
action="store_true",
help="Skip the agent — deliver the rendered prompt directly. Zero LLM cost.",
)
w_add.add_argument("--url", default="", help="Shortcut for --arg url=<URL> (http_json / rss)")
w_add.add_argument("--repo", default="", help="Shortcut for --arg repo=owner/name (github)")
w_add.add_argument(
"--scope",
default="",
help="github scope: issues | pulls | releases | commits (default: issues)",
)
w_add.add_argument(
"--arg",
action="append",
default=[],
help="Additional provider config as key=value (repeatable)",
)
w_add.add_argument(
"--config",
default="",
help="Full provider config as a JSON object (merged with --arg)",
)
w_list = watch_subparsers.add_parser("list", aliases=["ls"], help="List all watchers")
w_list.add_argument("--verbose", action="store_true", help="Include last_error detail")
w_rm = watch_subparsers.add_parser("remove", aliases=["rm"], help="Remove a watcher")
w_rm.add_argument("name", help="Watcher name to remove")
w_run = watch_subparsers.add_parser(
"run", help="Fire one watcher once, out of band (respects dedup)"
)
w_run.add_argument("name", help="Watcher name to run")
w_reset = watch_subparsers.add_parser(
"reset", help="Clear a watcher's watermark — next run treats it as a first poll"
)
w_reset.add_argument("name", help="Watcher name to reset")
watch_subparsers.add_parser("tick", help="Poll every due watcher (ad-hoc)")
watch_parser.set_defaults(func=cmd_watch)
# =========================================================================
# kanban command — multi-profile collaboration board
# =========================================================================

215
hermes_cli/watchers.py Normal file
View File

@@ -0,0 +1,215 @@
"""hermes watch — manage polling watchers from the CLI.
Usage:
hermes watch add <name> --provider <p> --url <url> [--interval 300] ...
hermes watch list
hermes watch remove <name>
hermes watch run <name> # fire the watcher once, out of band
hermes watch reset <name> # clear the watermark; next run replays all
Watchers poll an external source on an interval, detect new items via
watermark-based dedup, and deliver the result (verbatim or via agent).
Subscriptions persist to ~/.hermes/watchers.json.
"""
from __future__ import annotations
import json
import re
import time
from typing import Any, Dict, List
from hermes_constants import display_hermes_home
from watchers.engine import run_watcher, tick
from watchers.providers import PROVIDERS
from watchers.store import (
WatcherSubscription,
delete_watcher,
get_watcher,
list_watchers,
load_watermark,
save_watcher,
save_watermark,
)
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
def _parse_kv_pairs(items: List[str], *, label: str) -> Dict[str, str]:
"""Parse a list of ``k=v`` strings into a dict; errors out cleanly."""
out: Dict[str, str] = {}
for item in items or []:
if "=" not in item:
raise ValueError(
f"--{label} expects key=value pairs; got {item!r}"
)
k, v = item.split("=", 1)
out[k.strip()] = v.strip()
return out
def _parse_config_json(raw: str) -> Dict[str, Any]:
raw = (raw or "").strip()
if not raw:
return {}
try:
parsed = json.loads(raw)
except json.JSONDecodeError as e:
raise ValueError(f"--config must be valid JSON: {e}") from e
if not isinstance(parsed, dict):
raise ValueError(f"--config must be a JSON object; got {type(parsed).__name__}")
return parsed
def watch_command(args) -> None:
"""Entry point for 'hermes watch' subcommand."""
sub = getattr(args, "watch_action", None)
if not sub:
print("Usage: hermes watch {add|list|remove|run|reset}")
print("Run 'hermes watch --help' for details.")
return
if sub in ("add", "subscribe"):
_cmd_add(args)
elif sub in ("list", "ls"):
_cmd_list(args)
elif sub in ("remove", "rm"):
_cmd_remove(args)
elif sub == "run":
_cmd_run(args)
elif sub == "reset":
_cmd_reset(args)
elif sub == "tick":
_cmd_tick(args)
else:
print(f"Unknown watch subcommand: {sub}")
def _cmd_add(args) -> None:
name = (args.name or "").strip().lower().replace(" ", "-")
if not _NAME_RE.match(name):
print(f"Error: name must be lowercase alphanumerics + '-'/'_' (got {args.name!r})")
return
provider = (args.provider or "").lower()
if provider not in PROVIDERS:
print(f"Error: unknown provider {args.provider!r}.")
print(f" Known providers: {sorted(PROVIDERS)}")
return
# Config assembly: --config JSON + individual --arg flags merged.
try:
cfg = _parse_config_json(getattr(args, "config", "") or "")
cfg.update(_parse_kv_pairs(getattr(args, "arg", None) or [], label="arg"))
except ValueError as e:
print(f"Error: {e}")
return
# Convenience flags — providers that commonly need these get dedicated args.
if getattr(args, "url", None):
cfg.setdefault("url", args.url)
if getattr(args, "repo", None):
cfg.setdefault("repo", args.repo)
if getattr(args, "scope", None):
cfg.setdefault("scope", args.scope)
existing = get_watcher(name)
sub = WatcherSubscription(
name=name,
provider=provider,
config=cfg,
interval_seconds=int(args.interval),
prompt=args.prompt or "",
skills=[s.strip() for s in (args.skills or "").split(",") if s.strip()],
deliver=args.deliver or "origin",
deliver_only=bool(args.deliver_only),
enabled=True,
created_at=(existing.created_at if existing else time.time()),
)
save_watcher(sub)
verb = "Updated" if existing else "Added"
print(f"{verb} watcher '{name}':")
print(f" provider = {sub.provider}")
print(f" interval = {sub.interval_seconds}s")
print(f" deliver = {sub.deliver}")
print(f" deliver_only = {sub.deliver_only}")
print(f" config = {json.dumps(sub.config, indent=2)}")
if not existing:
print()
print(f"The watermark file will be created at {display_hermes_home()}/watchers/{name}.watermark.json")
print("on first successful poll. The first poll records baseline state and")
print("does NOT fire; only items that appear AFTER the baseline are delivered.")
def _cmd_list(args) -> None:
subs = list_watchers()
if not subs:
print("No watchers registered.")
print()
print("Add one with: hermes watch add <name> --provider <p> --url <url>")
return
print(f"{'NAME':<20} {'PROVIDER':<12} {'INTERVAL':<10} {'LAST RUN':<21} {'EVENTS':<7} {'STATUS'}")
for sub in sorted(subs, key=lambda s: s.name):
last_run = (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(sub.last_run_at))
if sub.last_run_at
else "never"
)
status = "disabled" if not sub.enabled else ("error" if sub.last_error else "ok")
print(
f"{sub.name:<20} {sub.provider:<12} {sub.interval_seconds!s:<10} "
f"{last_run:<21} {sub.last_event_count:<7} {status}"
)
if sub.last_error and getattr(args, "verbose", False):
print(f" ↳ last_error: {sub.last_error}")
def _cmd_remove(args) -> None:
name = args.name.strip().lower()
if delete_watcher(name):
print(f"Removed watcher '{name}' and cleared its watermark.")
else:
print(f"No watcher named '{name}'.")
def _cmd_run(args) -> None:
name = args.name.strip().lower()
sub = get_watcher(name)
if not sub:
print(f"No watcher named '{name}'.")
return
print(f"Polling watcher '{name}' ({sub.provider})...")
outcome = run_watcher(sub)
sub.last_run_at = time.time()
sub.last_error = outcome.get("error")
sub.last_event_count = outcome.get("new_events", 0)
save_watcher(sub)
print(f" status = {outcome.get('status')}")
print(f" new events = {outcome.get('new_events')}")
if outcome.get("error"):
print(f" error = {outcome['error']}")
def _cmd_reset(args) -> None:
name = args.name.strip().lower()
if not get_watcher(name):
print(f"No watcher named '{name}'.")
return
save_watermark(name, {})
print(f"Cleared watermark for '{name}'. Next poll will treat it as a first run.")
def _cmd_tick(args) -> None:
"""Fire every due watcher once and print outcomes (CLI-only ad-hoc poll)."""
outcomes = tick(verbose=True)
if not outcomes:
print("No watchers due.")
return
for o in outcomes:
flag = "!" if o["status"] != "ok" else " "
err = f" [{o['error']}]" if o.get("error") else ""
print(f" {flag} {o['name']:<20} {o['status']:<8} events={o['new_events']}{err}")

View File

@@ -0,0 +1 @@
"""Marker file so pytest discovers the watchers test package."""

View File

@@ -0,0 +1,232 @@
"""Tests for watchers/engine.py — due-check, prompt rendering, run_watcher outcomes.
These avoid touching the real AIAgent / cron delivery — we patch them so the
engine logic (dedup, baseline, error capture) is isolated.
"""
from __future__ import annotations
import time
from unittest.mock import MagicMock, patch
import pytest
from watchers.engine import _is_due, _render_prompt, run_watcher, tick
from watchers.providers import PROVIDERS, ProviderError
from watchers.store import WatcherSubscription, save_watcher
@pytest.fixture
def watcher_home(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
import importlib
import hermes_constants
importlib.reload(hermes_constants)
return home
class TestIsDue:
def test_new_watcher_is_always_due(self):
sub = WatcherSubscription(name="n", provider="http_json", interval_seconds=60)
assert _is_due(sub) is True
def test_recent_run_is_not_due(self):
sub = WatcherSubscription(
name="n",
provider="http_json",
interval_seconds=60,
last_run_at=time.time() - 10,
)
assert _is_due(sub) is False
def test_old_run_is_due(self):
sub = WatcherSubscription(
name="n",
provider="http_json",
interval_seconds=60,
last_run_at=time.time() - 120,
)
assert _is_due(sub) is True
def test_disabled_never_due(self):
sub = WatcherSubscription(
name="n",
provider="http_json",
interval_seconds=60,
last_run_at=None,
enabled=False,
)
assert _is_due(sub) is False
def test_interval_floor_prevents_flooding(self):
"""Interval below 5s is clamped to 5s so a bad config can't DDOS."""
sub = WatcherSubscription(
name="n",
provider="http_json",
interval_seconds=1,
last_run_at=time.time() - 3,
)
assert _is_due(sub) is False
class TestRenderPrompt:
def test_default_prompt_shows_count_and_items(self):
sub = WatcherSubscription(name="feed", provider="rss")
rendered = _render_prompt("", [{"id": "a"}, {"id": "b"}], sub)
assert "feed" in rendered
assert "2 new event" in rendered
assert '"id": "a"' in rendered
def test_placeholder_substitution(self):
sub = WatcherSubscription(name="pr-watcher", provider="github")
template = "Watcher {name} saw {count} new items:\n{items_json}"
rendered = _render_prompt(template, [{"x": 1}], sub)
assert rendered.startswith("Watcher pr-watcher saw 1 new items:")
assert '"x": 1' in rendered
def test_unknown_placeholders_pass_through(self):
"""Users shouldn't be punished for typos with a KeyError."""
sub = WatcherSubscription(name="w", provider="http_json")
rendered = _render_prompt("hi {unknown} there", [], sub)
assert "{unknown}" in rendered
class TestRunWatcher:
def test_unknown_provider_returns_error(self, watcher_home):
sub = WatcherSubscription(name="bad", provider="does-not-exist")
outcome = run_watcher(sub)
assert outcome["status"] == "error"
assert "Unknown watcher provider" in outcome["error"]
def test_provider_error_is_captured(self, watcher_home):
def failing(_config, _watermark):
raise ProviderError("backend down")
PROVIDERS["unit-failing"] = failing
try:
sub = WatcherSubscription(name="w", provider="unit-failing", config={})
outcome = run_watcher(sub)
assert outcome["status"] == "error"
assert "backend down" in outcome["error"]
finally:
PROVIDERS.pop("unit-failing", None)
def test_empty_delta_produces_no_delivery(self, watcher_home):
def noop(_c, _wm):
return [], {"seen_ids": ["old"]}
PROVIDERS["unit-noop"] = noop
try:
sub = WatcherSubscription(name="w", provider="unit-noop")
with patch("watchers.engine._deliver_payload") as deliver_mock, \
patch("watchers.engine._run_agent_for_watcher") as agent_mock:
outcome = run_watcher(sub)
assert outcome["status"] == "ok"
assert outcome["new_events"] == 0
deliver_mock.assert_not_called()
agent_mock.assert_not_called()
finally:
PROVIDERS.pop("unit-noop", None)
def test_deliver_only_skips_agent(self, watcher_home):
def new_item(_c, _wm):
return [{"id": "1", "title": "new thing"}], {"seen_ids": ["1"]}
PROVIDERS["unit-fresh"] = new_item
try:
sub = WatcherSubscription(
name="w", provider="unit-fresh", deliver="local", deliver_only=True
)
with patch("watchers.engine._deliver_payload", return_value=None) as deliver_mock, \
patch("watchers.engine._run_agent_for_watcher") as agent_mock:
outcome = run_watcher(sub)
assert outcome["status"] == "ok"
assert outcome["new_events"] == 1
deliver_mock.assert_called_once()
agent_mock.assert_not_called()
# The prompt was passed verbatim as the delivery payload.
delivered_content = deliver_mock.call_args[0][1]
assert "new thing" in delivered_content
finally:
PROVIDERS.pop("unit-fresh", None)
def test_agent_mode_invokes_agent_then_delivers_its_response(self, watcher_home):
def new_item(_c, _wm):
return [{"id": "1"}], {"seen_ids": ["1"]}
PROVIDERS["unit-fresh2"] = new_item
try:
sub = WatcherSubscription(name="w", provider="unit-fresh2", deliver="local")
with patch(
"watchers.engine._run_agent_for_watcher", return_value="Agent's reply"
) as agent_mock, patch(
"watchers.engine._deliver_payload", return_value=None
) as deliver_mock:
outcome = run_watcher(sub)
agent_mock.assert_called_once()
deliver_mock.assert_called_once()
assert deliver_mock.call_args[0][1] == "Agent's reply"
assert outcome["status"] == "ok"
finally:
PROVIDERS.pop("unit-fresh2", None)
class TestTickLoop:
def test_tick_skips_not_due_watchers(self, watcher_home):
"""Watchers whose interval hasn't elapsed are not polled."""
sub = WatcherSubscription(
name="fresh",
provider="http_json",
interval_seconds=3600,
last_run_at=time.time() - 60,
)
save_watcher(sub)
with patch("watchers.engine.run_watcher") as run_mock:
outcomes = tick()
run_mock.assert_not_called()
assert outcomes == []
def test_tick_runs_due_watcher_and_persists_last_run_at(self, watcher_home):
sub = WatcherSubscription(
name="stale",
provider="http_json",
interval_seconds=60,
last_run_at=time.time() - 300,
)
save_watcher(sub)
with patch(
"watchers.engine.run_watcher",
return_value={"name": "stale", "status": "ok", "new_events": 0, "error": None},
):
outcomes = tick()
from watchers.store import get_watcher
refreshed = get_watcher("stale")
assert refreshed is not None
assert refreshed.last_run_at is not None
assert refreshed.last_run_at > time.time() - 5 # just set
assert outcomes[0]["name"] == "stale"
def test_tick_captures_per_watcher_errors_without_crashing(self, watcher_home):
"""A crashing watcher doesn't kill the whole tick."""
for i in range(3):
save_watcher(WatcherSubscription(name=f"w{i}", provider="http_json"))
def fake_run(sub, *, now=None, adapters=None, loop=None):
if sub.name == "w1":
return {"name": sub.name, "status": "error", "new_events": 0,
"error": "boom"}
return {"name": sub.name, "status": "ok", "new_events": 0, "error": None}
with patch("watchers.engine.run_watcher", side_effect=fake_run):
outcomes = tick()
by_name = {o["name"]: o for o in outcomes}
assert by_name["w0"]["status"] == "ok"
assert by_name["w1"]["status"] == "error"
assert by_name["w2"]["status"] == "ok"

View File

@@ -0,0 +1,221 @@
"""Tests for watchers/providers.py — watermark dedup, first-run baseline, provider registry."""
from __future__ import annotations
import json
from unittest.mock import patch
import pytest
from watchers.providers import (
PROVIDERS,
ProviderError,
register,
resolve_provider,
)
class TestProviderRegistry:
def test_builtin_providers_registered(self):
for name in ("http_json", "rss", "github"):
assert name in PROVIDERS
assert callable(PROVIDERS[name])
def test_resolve_provider_case_insensitive(self):
assert resolve_provider("HTTP_JSON") is PROVIDERS["http_json"]
assert resolve_provider("Http_Json") is PROVIDERS["http_json"]
def test_resolve_unknown_raises_keyerror(self):
with pytest.raises(KeyError, match="Unknown watcher provider"):
resolve_provider("does-not-exist")
def test_register_adds_custom_provider(self):
def custom(_config, _watermark):
return [], {}
try:
register("unit-test-provider", custom)
assert resolve_provider("unit-test-provider") is custom
finally:
PROVIDERS.pop("unit-test-provider", None)
class TestHttpJsonProvider:
"""The http_json provider: list of items → dedup by id_field."""
def _call(self, response_body, config, watermark):
"""Invoke the provider with a mocked HTTP response."""
provider = resolve_provider("http_json")
with patch(
"watchers.providers._http_get",
return_value=json.dumps(response_body).encode("utf-8"),
):
return provider(config, watermark)
def test_first_run_records_baseline_without_emitting(self):
"""First poll of a new watcher must NOT replay the existing feed."""
items = [{"id": 1}, {"id": 2}, {"id": 3}]
new_items, wm = self._call(items, {"url": "x"}, {})
assert new_items == []
assert sorted(wm["seen_ids"]) == ["1", "2", "3"]
def test_subsequent_poll_emits_new_items_only(self):
# Baseline recorded from a prior poll.
wm = {"seen_ids": ["1", "2", "3"]}
items = [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}]
new_items, new_wm = self._call(items, {"url": "x"}, wm)
assert [i["id"] for i in new_items] == [4, 5]
assert "4" in new_wm["seen_ids"]
assert "5" in new_wm["seen_ids"]
def test_idempotent_on_empty_delta(self):
wm = {"seen_ids": ["1", "2"]}
items = [{"id": 1}, {"id": 2}]
new_items, _ = self._call(items, {"url": "x"}, wm)
assert new_items == []
def test_items_path_dotted_lookup(self):
body = {"data": {"results": [{"id": 1}, {"id": 2}]}}
_, wm = self._call(body, {"url": "x", "items_path": "data.results"}, {})
assert sorted(wm["seen_ids"]) == ["1", "2"]
def test_missing_id_field_skips_item(self):
items = [{"id": 1}, {"other": 2}, {"id": 3}]
_, wm = self._call(items, {"url": "x"}, {})
# Item without id is dropped entirely.
assert sorted(wm["seen_ids"]) == ["1", "3"]
def test_custom_id_field(self):
items = [{"uuid": "a"}, {"uuid": "b"}]
_, wm = self._call(items, {"url": "x", "id_field": "uuid"}, {})
assert sorted(wm["seen_ids"]) == ["a", "b"]
def test_max_seen_caps_watermark_memory(self):
# 600 items with max_seen=100 → only keep last 100 after cap.
items = [{"id": i} for i in range(600)]
_, wm = self._call(items, {"url": "x", "max_seen": 100}, {})
assert len(wm["seen_ids"]) == 100
def test_raises_provider_error_on_non_list_result(self):
provider = resolve_provider("http_json")
with patch("watchers.providers._http_get", return_value=b'{"not": "a list"}'):
with pytest.raises(ProviderError, match="did not resolve to a list"):
provider({"url": "x", "items_path": "not"}, {})
def test_missing_url_raises_provider_error(self):
provider = resolve_provider("http_json")
with pytest.raises(ProviderError, match="'url' is required"):
provider({}, {})
def test_invalid_json_raises_provider_error(self):
provider = resolve_provider("http_json")
with patch("watchers.providers._http_get", return_value=b"not json"):
with pytest.raises(ProviderError, match="not valid JSON"):
provider({"url": "x"}, {})
class TestRssProvider:
SAMPLE_RSS = b"""<?xml version="1.0"?>
<rss version="2.0">
<channel>
<title>Example</title>
<item>
<title>First post</title>
<link>https://example.com/1</link>
<guid>post-1</guid>
<description>Hello</description>
<pubDate>Mon, 05 May 2026 10:00:00 GMT</pubDate>
</item>
<item>
<title>Second post</title>
<link>https://example.com/2</link>
<guid>post-2</guid>
<description>World</description>
<pubDate>Tue, 06 May 2026 10:00:00 GMT</pubDate>
</item>
</channel>
</rss>
"""
SAMPLE_ATOM = b"""<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<entry>
<id>atom-1</id>
<title>Atom One</title>
<link href="https://example.com/a1"/>
<summary>Atom summary</summary>
<updated>2026-05-07T12:00:00Z</updated>
</entry>
</feed>
"""
def test_rss_first_run_baseline(self):
provider = resolve_provider("rss")
with patch("watchers.providers._http_get", return_value=self.SAMPLE_RSS):
new_items, wm = provider({"url": "http://f.example/feed"}, {})
assert new_items == []
assert "post-1" in wm["seen_guids"]
assert "post-2" in wm["seen_guids"]
def test_rss_subsequent_run_emits_only_new(self):
provider = resolve_provider("rss")
wm = {"seen_guids": ["post-1"]} # only post-1 seen previously
with patch("watchers.providers._http_get", return_value=self.SAMPLE_RSS):
new_items, new_wm = provider({"url": "http://f.example/feed"}, wm)
assert len(new_items) == 1
assert new_items[0]["id"] == "post-2"
assert new_items[0]["title"] == "Second post"
assert new_items[0]["url"] == "https://example.com/2"
def test_atom_format_parses(self):
provider = resolve_provider("rss")
with patch("watchers.providers._http_get", return_value=self.SAMPLE_ATOM):
_, wm = provider({"url": "http://f/atom"}, {})
assert "atom-1" in wm["seen_guids"]
def test_invalid_xml_raises_provider_error(self):
provider = resolve_provider("rss")
with patch("watchers.providers._http_get", return_value=b"<not valid"):
with pytest.raises(ProviderError, match="invalid XML"):
provider({"url": "x"}, {})
class TestGithubProvider:
def test_rejects_invalid_repo_format(self):
provider = resolve_provider("github")
with pytest.raises(ProviderError, match="must be 'owner/name'"):
provider({"repo": "no-slash-here"}, {})
def test_rejects_unknown_scope(self):
provider = resolve_provider("github")
with pytest.raises(ProviderError, match="scope must be one of"):
provider({"repo": "a/b", "scope": "banana"}, {})
def test_requires_repo_or_search(self):
provider = resolve_provider("github")
with pytest.raises(ProviderError, match="'repo' or 'search' is required"):
provider({}, {})
def test_github_dedups_by_id_and_baselines_on_first_run(self):
provider = resolve_provider("github")
fake_issues = [
{"id": 100, "number": 1, "title": "A", "html_url": "u1", "state": "open",
"user": {"login": "alice"}, "created_at": "t", "body": "..."},
{"id": 200, "number": 2, "title": "B", "html_url": "u2", "state": "closed",
"user": {"login": "bob"}, "created_at": "t", "body": "..."},
]
with patch("watchers.providers._http_get", return_value=json.dumps(fake_issues).encode()):
new_items, wm = provider({"repo": "a/b", "scope": "issues"}, {})
assert new_items == []
assert sorted(wm["seen_ids"]) == ["100", "200"]
# Next poll with a new issue on top.
fake_issues.insert(0, {
"id": 300, "number": 3, "title": "C", "html_url": "u3", "state": "open",
"user": {"login": "carol"}, "created_at": "t", "body": "fresh"
})
with patch("watchers.providers._http_get", return_value=json.dumps(fake_issues).encode()):
new_items, _ = provider({"repo": "a/b", "scope": "issues"}, wm)
assert len(new_items) == 1
assert new_items[0]["id"] == "300"
assert new_items[0]["author"] == "carol"

View File

@@ -0,0 +1,103 @@
"""Tests for watchers/store.py — subscription CRUD + watermark persistence."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from watchers.store import (
WatcherSubscription,
delete_watcher,
get_watcher,
list_watchers,
load_watermark,
save_watcher,
save_watermark,
)
@pytest.fixture
def watcher_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME so tests don't stomp user data."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
# Ensure any cached hermes_constants path is recomputed.
import importlib
import hermes_constants
importlib.reload(hermes_constants)
return home
class TestWatcherSubscription:
def test_from_dict_fills_defaults_for_missing_keys(self):
sub = WatcherSubscription.from_dict({"name": "x", "provider": "http_json"})
assert sub.name == "x"
assert sub.provider == "http_json"
assert sub.interval_seconds == 300
assert sub.deliver == "origin"
assert sub.deliver_only is False
assert sub.enabled is True
assert sub.last_run_at is None
def test_round_trip_preserves_fields(self, watcher_home):
sub = WatcherSubscription(
name="github-issues",
provider="github",
config={"repo": "foo/bar", "scope": "issues"},
interval_seconds=120,
prompt="New: {items_json}",
skills=["a", "b"],
deliver="telegram,discord",
deliver_only=True,
)
save_watcher(sub)
fetched = get_watcher("github-issues")
assert fetched is not None
assert fetched.name == "github-issues"
assert fetched.provider == "github"
assert fetched.config == {"repo": "foo/bar", "scope": "issues"}
assert fetched.interval_seconds == 120
assert fetched.skills == ["a", "b"]
assert fetched.deliver == "telegram,discord"
assert fetched.deliver_only is True
def test_delete_clears_watermark(self, watcher_home):
sub = WatcherSubscription(name="w", provider="http_json", config={"url": "x"})
save_watcher(sub)
save_watermark("w", {"seen_ids": ["1", "2"]})
assert (watcher_home / "watchers" / "w.watermark.json").exists()
assert delete_watcher("w") is True
assert get_watcher("w") is None
assert not (watcher_home / "watchers" / "w.watermark.json").exists()
def test_delete_nonexistent_returns_false(self, watcher_home):
assert delete_watcher("nope") is False
def test_list_watchers_returns_all(self, watcher_home):
for i in range(3):
save_watcher(
WatcherSubscription(name=f"w{i}", provider="http_json", config={"url": f"u{i}"})
)
names = sorted(w.name for w in list_watchers())
assert names == ["w0", "w1", "w2"]
def test_watermark_is_opaque_dict_preserved_verbatim(self, watcher_home):
Path(watcher_home / "watchers").mkdir(parents=True, exist_ok=True)
wm = {"seen_ids": ["a", "b"], "last_polled_at": 12345.678, "custom": {"nested": True}}
save_watermark("test", wm)
assert load_watermark("test") == wm
def test_load_watermark_returns_empty_when_missing(self, watcher_home):
assert load_watermark("nonexistent") == {}
def test_subscriptions_file_is_valid_json(self, watcher_home):
save_watcher(WatcherSubscription(name="a", provider="http_json", config={"url": "x"}))
raw = (watcher_home / "watchers.json").read_text(encoding="utf-8")
parsed = json.loads(raw)
assert "a" in parsed

55
watchers/__init__.py Normal file
View File

@@ -0,0 +1,55 @@
"""Watchers — interval-polling with watermark dedup, inspired by Vellum Assistant's watcher system.
A watcher periodically fetches data from an external source, compares new items
against a stored watermark, and — if new items exist — hands them to the agent
(or delivers them verbatim, in no-agent mode) as the prompt context.
Watchers are the **pull-based** sibling of webhooks. Webhooks: source pushes →
agent reacts. Watchers: scheduler pulls on an interval → agent reacts to what's
new. Cron is the time-based sibling: scheduler fires → agent runs a prompt,
regardless of whether anything changed.
Key design choices:
- Subscriptions live in ``~/.hermes/watchers.json`` (parallels webhooks).
- Watermarks live in ``~/.hermes/watchers/<name>.watermark.json`` so provider
state can be inspected / reset per-watcher without touching the subscription
file.
- Providers implement ``fetch_new(config, watermark) -> (items, new_watermark)``
and are kept deliberately stateless so the scheduler can call them from any
thread.
- Delivery reuses the cron ``deliver`` plumbing: ``local`` / ``origin`` /
platform targets / ``multi`` / ``all``.
Ship-with providers: ``http_json``, ``rss``, ``github``. Custom providers can
be registered via ``watchers.providers.register()``.
"""
from watchers.providers import ( # noqa: F401
PROVIDERS,
ProviderError,
register,
resolve_provider,
)
from watchers.store import ( # noqa: F401
WatcherSubscription,
delete_watcher,
get_watcher,
list_watchers,
load_watermark,
save_watcher,
save_watermark,
)
__all__ = [
"WatcherSubscription",
"PROVIDERS",
"ProviderError",
"delete_watcher",
"get_watcher",
"list_watchers",
"load_watermark",
"register",
"resolve_provider",
"save_watcher",
"save_watermark",
]

290
watchers/engine.py Normal file
View File

@@ -0,0 +1,290 @@
"""Watcher engine — the tick-time poller that runs every watcher whose
interval has elapsed, resolves any new items, and delivers them (verbatim
in ``deliver_only`` mode, or as prompt context to a short-lived agent).
Safe to call from the cron scheduler tick loop: guarded by its own file
lock so concurrent ticks (gateway in-process + standalone daemon) don't
double-fire.
"""
from __future__ import annotations
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional
from hermes_constants import get_hermes_home
from watchers.providers import ProviderError, resolve_provider
from watchers.store import (
WatcherSubscription,
list_watchers,
load_watermark,
save_watcher,
save_watermark,
)
logger = logging.getLogger(__name__)
try:
import fcntl # type: ignore
except ImportError: # pragma: no cover - Windows fallback
fcntl = None # type: ignore
try:
import msvcrt # type: ignore
except ImportError: # pragma: no cover - Unix
msvcrt = None # type: ignore
def _lock_path() -> Path:
d = get_hermes_home() / "watchers"
d.mkdir(parents=True, exist_ok=True)
return d / ".tick.lock"
def _is_due(sub: WatcherSubscription, *, now: Optional[float] = None) -> bool:
"""Has enough time elapsed since the last run?"""
if not sub.enabled:
return False
now = now or time.time()
if sub.last_run_at is None:
return True
return (now - sub.last_run_at) >= max(5, int(sub.interval_seconds))
def _render_prompt(template: str, items: List[Dict[str, Any]], sub: WatcherSubscription) -> str:
"""Render the watcher prompt template with the new items.
If ``template`` is empty, a sensible default is produced so the user
doesn't have to write a prompt for a quick ``hermes watch add``.
``{items_json}`` / ``{name}`` / ``{count}`` are recognized placeholders;
unknown placeholders are left verbatim so they don't raise at render
time.
"""
payload = json.dumps(items, indent=2, default=str)
if not template:
return (
f"{sub.name}: {len(items)} new event(s) from the {sub.provider} watcher.\n\n"
f"Items:\n{payload}"
)
placeholders = {
"items_json": payload,
"name": sub.name,
"count": str(len(items)),
}
def _replace(match: "re.Match[str]") -> str: # type: ignore[name-defined]
key = match.group(1)
return placeholders.get(key, match.group(0))
import re
return re.sub(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}", _replace, template)
# ---------------------------------------------------------------------------
# Delivery — re-uses cron delivery plumbing so deliver="multi" etc. work
# identically. We piggyback on cron's ``_deliver_result`` by staging a
# synthetic "job" dict that looks like a no-agent cron job.
# ---------------------------------------------------------------------------
def _deliver_payload(
sub: WatcherSubscription,
content: str,
*,
adapters: Optional[Dict[str, Any]] = None,
loop: Any = None,
) -> Optional[str]:
"""Deliver ``content`` using the cron delivery plumbing.
Returns None on success or an error string.
"""
try:
from cron.scheduler import _deliver_result
except ImportError as e:
return f"cron delivery unavailable: {e}"
synthetic_job = {
"id": f"watcher:{sub.name}",
"name": f"watch/{sub.name}",
"deliver": sub.deliver,
"origin": None,
}
try:
return _deliver_result(synthetic_job, content, adapters=adapters, loop=loop)
except Exception as e:
logger.exception("Watcher %s: delivery crashed: %s", sub.name, e)
return f"delivery crashed: {e}"
# ---------------------------------------------------------------------------
# Agent dispatch — when deliver_only is False, we hand the items to a
# short-lived AIAgent (same pattern cron uses for agent-mode jobs). In
# deliver_only mode, the rendered prompt is sent verbatim as the message.
# ---------------------------------------------------------------------------
def _run_agent_for_watcher(
sub: WatcherSubscription,
prompt: str,
*,
adapters: Optional[Dict[str, Any]] = None,
loop: Any = None,
) -> str:
"""Run a one-shot agent session for the watcher and return its final response.
Errors are captured and returned as error strings so the tick loop doesn't
crash on one misbehaving watcher.
"""
try:
from run_agent import AIAgent
except ImportError as e:
return f"[watcher error: AIAgent unavailable: {e}]"
try:
agent = AIAgent(
quiet_mode=True,
platform="watcher",
session_id=f"watcher-{sub.name}-{int(time.time())}",
skip_context_files=False,
skip_memory=True,
save_trajectories=False,
)
if sub.skills:
# Skill loading is best-effort; if a skill is missing we note it
# and proceed. The watcher is generally running unattended.
try:
from agent.skill_tools import skill_view as _skv # noqa: F401
except Exception:
pass
return agent.chat(prompt) or ""
except Exception as e:
logger.exception("Watcher %s: agent crashed: %s", sub.name, e)
return f"[watcher error: agent crashed: {e}]"
# ---------------------------------------------------------------------------
# Public entrypoints
# ---------------------------------------------------------------------------
def run_watcher(
sub: WatcherSubscription,
*,
now: Optional[float] = None,
adapters: Optional[Dict[str, Any]] = None,
loop: Any = None,
) -> Dict[str, Any]:
"""Execute one poll cycle for a single watcher.
Returns a dict summarizing the outcome::
{
"name": "...", "status": "ok"|"error"|"skipped",
"new_events": N, "error": None|str,
}
The caller is responsible for persisting updates — we return rather than
mutate the global store so callers with custom storage can compose the
engine piece-by-piece.
"""
now = now or time.time()
result = {"name": sub.name, "status": "ok", "new_events": 0, "error": None}
watermark = load_watermark(sub.name)
try:
provider = resolve_provider(sub.provider)
except KeyError as e:
result.update(status="error", error=str(e))
return result
try:
new_items, new_watermark = provider(sub.config, watermark)
except ProviderError as e:
logger.warning("Watcher %s: provider error: %s", sub.name, e)
result.update(status="error", error=str(e))
return result
except Exception as e: # defensive — never let a provider crash the tick
logger.exception("Watcher %s: unexpected provider failure: %s", sub.name, e)
result.update(status="error", error=f"unexpected: {e}")
return result
# Persist the watermark even on empty polls so ``last_polled_at`` etc.
# advance (useful for observability).
save_watermark(sub.name, new_watermark)
if not new_items:
result["new_events"] = 0
return result
result["new_events"] = len(new_items)
prompt = _render_prompt(sub.prompt, new_items, sub)
if sub.deliver_only:
err = _deliver_payload(sub, prompt, adapters=adapters, loop=loop)
if err:
result.update(status="error", error=err)
return result
# Agent mode: run one-shot agent, then deliver its final output.
response = _run_agent_for_watcher(sub, prompt, adapters=adapters, loop=loop)
if response:
err = _deliver_payload(sub, response, adapters=adapters, loop=loop)
if err:
result.update(status="error", error=err)
return result
def tick(
*,
now: Optional[float] = None,
adapters: Optional[Dict[str, Any]] = None,
loop: Any = None,
verbose: bool = False,
) -> List[Dict[str, Any]]:
"""Poll every due watcher and return per-watcher outcomes.
Uses a file lock so concurrent ticks (gateway + daemon) don't double-fire.
Returns an empty list silently if another tick holds the lock.
"""
lock_file = _lock_path()
lock_fd = None
try:
lock_fd = open(lock_file, "w")
if fcntl:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
elif msvcrt:
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
except (OSError, IOError):
logger.debug("Watcher tick skipped — another instance holds the lock")
if lock_fd is not None:
lock_fd.close()
return []
try:
outcomes: List[Dict[str, Any]] = []
for sub in list_watchers():
if not _is_due(sub, now=now):
continue
if verbose:
logger.info("Watcher %s: polling (%s)", sub.name, sub.provider)
outcome = run_watcher(sub, now=now, adapters=adapters, loop=loop)
# Persist timing + last error back to the subscription.
sub.last_run_at = now or time.time()
sub.last_error = outcome.get("error")
sub.last_event_count = outcome.get("new_events", 0)
save_watcher(sub)
outcomes.append(outcome)
return outcomes
finally:
if lock_fd is not None:
try:
if fcntl:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
except Exception:
pass
lock_fd.close()

385
watchers/providers.py Normal file
View File

@@ -0,0 +1,385 @@
"""Built-in watcher providers.
Each provider is a callable ``fetch_new(config, watermark) -> (items, new_watermark)``:
- ``config``: the provider-specific config dict from the subscription.
- ``watermark``: the opaque dict the provider returned last time (empty on first run).
- Returns a tuple of ``(new_items, new_watermark)``.
``new_items`` is a list of dicts (shape is provider-defined but should at
minimum include a human-readable ``title`` and ``url`` field where
applicable). ``new_watermark`` is persisted verbatim and handed back to the
provider on the next tick.
Providers MUST be idempotent — if nothing new is available, return
``([], watermark)``. Raising ``ProviderError`` marks the watcher as errored
for the tick but preserves the watermark (no data loss on transient
failures).
"""
from __future__ import annotations
import json
import logging
import re
import time
import urllib.error
import urllib.request
from typing import Any, Callable, Dict, List, Optional, Tuple
from xml.etree import ElementTree as ET
logger = logging.getLogger(__name__)
class ProviderError(Exception):
"""Raised when a provider fetch fails transiently.
Persistent config errors (missing required fields, bad URLs) should
surface as ProviderError too; the watcher engine records ``last_error``
on the subscription so ``hermes watch list`` shows the failure.
"""
# (config, watermark) -> (items, new_watermark)
ProviderFn = Callable[[Dict[str, Any], Dict[str, Any]], Tuple[List[Dict[str, Any]], Dict[str, Any]]]
PROVIDERS: Dict[str, ProviderFn] = {}
def register(name: str, fn: ProviderFn) -> None:
"""Register a custom provider under ``name``.
Plugins can call this at import time to add watcher providers. Names are
case-insensitive and must be unique.
"""
key = name.lower()
PROVIDERS[key] = fn
def resolve_provider(name: str) -> ProviderFn:
"""Look up a provider by name. Raises KeyError if unknown."""
key = (name or "").lower()
if key not in PROVIDERS:
raise KeyError(f"Unknown watcher provider: {name!r}. Known: {sorted(PROVIDERS)}")
return PROVIDERS[key]
# ---------------------------------------------------------------------------
# Shared HTTP helper — honors optional header/timeout config across providers.
# ---------------------------------------------------------------------------
def _http_get(url: str, *, headers: Optional[Dict[str, str]] = None, timeout: float = 20.0) -> bytes:
req = urllib.request.Request(url)
req.add_header("User-Agent", "Hermes-Watcher/1.0")
if headers:
for k, v in headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read()
except urllib.error.HTTPError as e:
raise ProviderError(f"HTTP {e.code} from {url}") from e
except urllib.error.URLError as e:
raise ProviderError(f"HTTP error: {e.reason} ({url})") from e
except (TimeoutError, OSError) as e:
raise ProviderError(f"Network error: {e} ({url})") from e
# ---------------------------------------------------------------------------
# Provider: http_json
#
# Polls a JSON endpoint and treats the response as either:
# - a top-level list of items, or
# - a dict with an ``items_path`` pointing at the list via dotted keys.
#
# Dedup is by a configurable ``id_field`` (default ``"id"``). Watermark stores
# a set of seen IDs, capped at ``max_seen`` (default 500) to bound memory.
# ---------------------------------------------------------------------------
def _dig(obj: Any, path: str) -> Any:
"""Dotted-path lookup: _dig({"a":{"b":[1,2]}}, "a.b") -> [1,2]."""
if not path:
return obj
cur = obj
for part in path.split("."):
if isinstance(cur, dict) and part in cur:
cur = cur[part]
else:
return None
return cur
def _provider_http_json(
config: Dict[str, Any], watermark: Dict[str, Any]
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
url = config.get("url")
if not url:
raise ProviderError("http_json: 'url' is required")
id_field = config.get("id_field", "id")
items_path = config.get("items_path", "")
headers = config.get("headers") or {}
max_seen = int(config.get("max_seen", 500))
timeout = float(config.get("timeout", 20.0))
raw = _http_get(url, headers=headers, timeout=timeout)
try:
data = json.loads(raw.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as e:
raise ProviderError(f"http_json: response is not valid JSON: {e}") from e
items = _dig(data, items_path) if items_path else data
if not isinstance(items, list):
raise ProviderError(
f"http_json: items_path={items_path!r} did not resolve to a list"
f" (got {type(items).__name__})"
)
seen_ids = set(watermark.get("seen_ids") or [])
is_first_run = not watermark # empty watermark = never polled
new_items: List[Dict[str, Any]] = []
new_seen_ids: List[str] = []
for item in items:
if not isinstance(item, dict):
continue
ident = item.get(id_field)
if ident is None:
continue
ident_str = str(ident)
new_seen_ids.append(ident_str)
if ident_str in seen_ids:
continue
# On first run, record all IDs but don't emit any as "new" events.
# Otherwise the first tick of a watcher would replay the entire feed.
if is_first_run:
continue
new_items.append(item)
# Cap the stored ID set. Keep the most recent entries — the response
# order is provider-defined, so we just preserve insertion order and
# trim the head.
combined = list(seen_ids) + [i for i in new_seen_ids if i not in seen_ids]
if len(combined) > max_seen:
combined = combined[-max_seen:]
return new_items, {"seen_ids": combined, "last_polled_at": time.time()}
register("http_json", _provider_http_json)
# ---------------------------------------------------------------------------
# Provider: rss (Atom + RSS 2.0)
#
# Watermark = {"seen_guids": [...]}. On first run, record all existing GUIDs
# as seen so the watcher only emits posts published AFTER it was created.
# ---------------------------------------------------------------------------
def _parse_rss_entries(xml_bytes: bytes) -> List[Dict[str, Any]]:
"""Return a list of {id, title, url, published, summary} dicts.
Handles both RSS 2.0 ``<item>`` and Atom ``<entry>``. Namespaces are
stripped for cleaner lookups.
"""
def _strip_ns(tag: str) -> str:
return tag.split("}", 1)[1] if "}" in tag else tag
try:
root = ET.fromstring(xml_bytes)
except ET.ParseError as e:
raise ProviderError(f"rss: invalid XML: {e}") from e
entries: List[Dict[str, Any]] = []
# RSS 2.0: <rss><channel><item>
for item in root.iter():
tag = _strip_ns(item.tag)
if tag not in ("item", "entry"):
continue
children = {_strip_ns(c.tag): c for c in item}
# ElementTree Elements with no children are *falsy* — use explicit
# `is not None` checks when picking between possible tags.
guid_el = children.get("guid")
if guid_el is None:
guid_el = children.get("id")
link_el = children.get("link")
if link_el is not None:
# Atom: link is empty element with href attr; RSS: link has text.
href = link_el.attrib.get("href") or (link_el.text or "").strip()
else:
href = ""
guid = (guid_el.text.strip() if guid_el is not None and guid_el.text else "") or href
if not guid:
continue
title_el = children.get("title")
title = (title_el.text or "").strip() if title_el is not None else ""
pub_el = children.get("pubDate")
if pub_el is None:
pub_el = children.get("published")
if pub_el is None:
pub_el = children.get("updated")
published = (pub_el.text or "").strip() if pub_el is not None else ""
summ_el = children.get("description")
if summ_el is None:
summ_el = children.get("summary")
summary = (summ_el.text or "").strip() if summ_el is not None else ""
entries.append(
{
"id": guid,
"title": title,
"url": href,
"published": published,
"summary": summary,
}
)
return entries
def _provider_rss(
config: Dict[str, Any], watermark: Dict[str, Any]
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
url = config.get("url")
if not url:
raise ProviderError("rss: 'url' is required")
headers = config.get("headers") or {}
max_seen = int(config.get("max_seen", 500))
timeout = float(config.get("timeout", 20.0))
raw = _http_get(url, headers=headers, timeout=timeout)
entries = _parse_rss_entries(raw)
seen = set(watermark.get("seen_guids") or [])
is_first_run = not watermark
new_items: List[Dict[str, Any]] = []
new_guids: List[str] = []
for entry in entries:
guid = entry["id"]
new_guids.append(guid)
if guid in seen:
continue
if is_first_run:
continue
new_items.append(entry)
combined = list(seen) + [g for g in new_guids if g not in seen]
if len(combined) > max_seen:
combined = combined[-max_seen:]
return new_items, {"seen_guids": combined, "last_polled_at": time.time()}
register("rss", _provider_rss)
# ---------------------------------------------------------------------------
# Provider: github
#
# Polls either:
# - ``repo`` mode: https://api.github.com/repos/<owner>/<repo>/issues or /releases
# - ``search`` mode: https://api.github.com/search/issues?q=<query>
#
# Authenticates with ``GITHUB_TOKEN`` if present (avoids 60 req/hr anon limit).
# ---------------------------------------------------------------------------
_GITHUB_SCOPES = {"issues", "pulls", "releases", "commits"}
def _provider_github(
config: Dict[str, Any], watermark: Dict[str, Any]
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
import os
repo = config.get("repo")
scope = (config.get("scope") or "issues").lower()
search_query = config.get("search")
per_page = int(config.get("per_page", 30))
if not repo and not search_query:
raise ProviderError("github: one of 'repo' or 'search' is required")
if scope not in _GITHUB_SCOPES and not search_query:
raise ProviderError(
f"github: scope must be one of {sorted(_GITHUB_SCOPES)} (got {scope!r})"
)
if not re.fullmatch(r"[A-Za-z0-9._-]+/[A-Za-z0-9._-]+", repo or "") and repo:
raise ProviderError(f"github: repo must be 'owner/name' (got {repo!r})")
if search_query:
url = f"https://api.github.com/search/issues?q={urllib.parse.quote(search_query)}&per_page={per_page}"
items_path = "items"
elif scope == "commits":
url = f"https://api.github.com/repos/{repo}/commits?per_page={per_page}"
items_path = ""
else:
url = f"https://api.github.com/repos/{repo}/{scope}?per_page={per_page}&state=all"
items_path = ""
headers = {"Accept": "application/vnd.github+json"}
token = config.get("token") or os.getenv("GITHUB_TOKEN") or os.getenv("GH_TOKEN")
if token:
headers["Authorization"] = f"Bearer {token}"
raw = _http_get(url, headers=headers, timeout=30.0)
try:
data = json.loads(raw.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as e:
raise ProviderError(f"github: response is not valid JSON: {e}") from e
items = _dig(data, items_path) if items_path else data
if not isinstance(items, list):
raise ProviderError(
f"github: expected a list; got {type(items).__name__}"
)
id_field = "sha" if scope == "commits" else "id"
seen = set(str(x) for x in (watermark.get("seen_ids") or []))
is_first_run = not watermark
new_items: List[Dict[str, Any]] = []
all_ids: List[str] = []
for item in items:
if not isinstance(item, dict):
continue
ident = str(item.get(id_field, "")) or ""
if not ident:
continue
all_ids.append(ident)
if ident in seen or is_first_run:
continue
# Flatten the interesting fields so the prompt template is readable.
new_items.append(
{
"id": ident,
"title": item.get("title") or item.get("name") or item.get("commit", {}).get("message", "").splitlines()[0:1] or "",
"url": item.get("html_url") or item.get("url"),
"number": item.get("number"),
"state": item.get("state"),
"author": (item.get("user") or {}).get("login") or (item.get("author") or {}).get("login"),
"created_at": item.get("created_at") or (item.get("commit") or {}).get("author", {}).get("date"),
"body": (item.get("body") or "")[:2000], # cap so the prompt stays bounded
}
)
combined = list(seen) + [i for i in all_ids if i not in seen]
if len(combined) > int(config.get("max_seen", 500)):
combined = combined[-int(config.get("max_seen", 500)):]
return new_items, {"seen_ids": combined, "last_polled_at": time.time()}
register("github", _provider_github)
# Re-export urllib.parse lazily so providers can use it without each
# importing it separately. (Doing the import at module-top would force it
# into every codepath that touches watchers.store.)
import urllib.parse # noqa: E402

172
watchers/store.py Normal file
View File

@@ -0,0 +1,172 @@
"""Watcher subscription + watermark storage.
Subscriptions persist to ``<hermes_home>/watchers.json``. Watermarks live
under ``<hermes_home>/watchers/<name>.watermark.json`` — separate files so
``hermes watch reset <name>`` can nuke a single watchermark without touching
the subscription, and so provider state can be inspected in isolation.
"""
from __future__ import annotations
import json
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_constants import get_hermes_home
from utils import atomic_replace
_SUBSCRIPTIONS_FILENAME = "watchers.json"
@dataclass
class WatcherSubscription:
"""A watcher subscription as persisted to ``watchers.json``.
Mirrors the webhook subscription shape where possible so users who know
one subsystem can reason about the other.
"""
name: str
provider: str
config: Dict[str, Any] = field(default_factory=dict)
interval_seconds: int = 300
prompt: str = ""
skills: List[str] = field(default_factory=list)
deliver: str = "origin"
deliver_only: bool = False
enabled: bool = True
created_at: float = field(default_factory=time.time)
last_run_at: Optional[float] = None
last_error: Optional[str] = None
last_event_count: int = 0
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "WatcherSubscription":
"""Tolerant of missing keys — old subscription files gain defaults."""
return cls(
name=d["name"],
provider=d["provider"],
config=dict(d.get("config") or {}),
interval_seconds=int(d.get("interval_seconds", 300)),
prompt=str(d.get("prompt", "")),
skills=list(d.get("skills") or []),
deliver=str(d.get("deliver", "origin")),
deliver_only=bool(d.get("deliver_only", False)),
enabled=bool(d.get("enabled", True)),
created_at=float(d.get("created_at") or time.time()),
last_run_at=(float(d["last_run_at"]) if d.get("last_run_at") else None),
last_error=d.get("last_error"),
last_event_count=int(d.get("last_event_count") or 0),
)
# ---------------------------------------------------------------------------
# Path helpers — resolved at call time so HERMES_HOME overrides work in tests.
# ---------------------------------------------------------------------------
def _subscriptions_path() -> Path:
return get_hermes_home() / _SUBSCRIPTIONS_FILENAME
def _watermark_dir() -> Path:
return get_hermes_home() / "watchers"
def _watermark_path(name: str) -> Path:
return _watermark_dir() / f"{name}.watermark.json"
# ---------------------------------------------------------------------------
# Subscription CRUD
# ---------------------------------------------------------------------------
def _load_all() -> Dict[str, Dict[str, Any]]:
path = _subscriptions_path()
if not path.exists():
return {}
try:
raw = json.loads(path.read_text(encoding="utf-8"))
return raw if isinstance(raw, dict) else {}
except Exception:
return {}
def _save_all(subs: Dict[str, Dict[str, Any]]) -> None:
path = _subscriptions_path()
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".tmp")
tmp.write_text(json.dumps(subs, indent=2, ensure_ascii=False), encoding="utf-8")
atomic_replace(tmp, path)
def list_watchers() -> List[WatcherSubscription]:
return [WatcherSubscription.from_dict(v) for v in _load_all().values()]
def get_watcher(name: str) -> Optional[WatcherSubscription]:
raw = _load_all().get(name)
return WatcherSubscription.from_dict(raw) if raw else None
def save_watcher(sub: WatcherSubscription) -> None:
all_subs = _load_all()
all_subs[sub.name] = sub.to_dict()
_save_all(all_subs)
def delete_watcher(name: str) -> bool:
all_subs = _load_all()
if name not in all_subs:
return False
del all_subs[name]
_save_all(all_subs)
# Also remove watermark file so re-adding doesn't inherit stale state.
wm = _watermark_path(name)
if wm.exists():
try:
wm.unlink()
except OSError:
pass
return True
# ---------------------------------------------------------------------------
# Watermark store — opaque per-provider state.
# ---------------------------------------------------------------------------
def load_watermark(name: str) -> Dict[str, Any]:
"""Return the persisted watermark dict for ``name`` (empty if unset)."""
path = _watermark_path(name)
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def save_watermark(name: str, watermark: Dict[str, Any]) -> None:
"""Persist the watermark dict for ``name``.
Providers should pass back whatever they were given via their ``fetch_new``
return value. The scheduler persists it verbatim — the shape is entirely
up to the provider.
"""
path = _watermark_path(name)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".tmp")
tmp.write_text(
json.dumps(watermark, indent=2, ensure_ascii=False, sort_keys=True),
encoding="utf-8",
)
atomic_replace(tmp, path)

View File

@@ -0,0 +1,122 @@
---
title: Watchers
description: Poll external sources on an interval and trigger the agent when new items appear.
---
# Watchers
Watchers poll an external source on an interval, detect new items via watermark-based dedup, and either deliver those items verbatim or hand them to a short-lived agent for reasoning. They are the **pull-based sibling** of webhooks:
| Trigger | Source | When |
|---|---|---|
| Webhook | external push | whenever the source calls you |
| Cron | time-based | on a schedule, regardless of whether anything changed |
| **Watcher** | **pull on interval** | **only when new data is detected** |
Inspired by Vellum Assistant's watcher system.
## Quick start
```bash
# Watch a GitHub repo for new issues
hermes watch add my-repo \
--provider github --repo NousResearch/hermes-agent --scope issues \
--interval 300 --deliver origin
# Watch an RSS feed, deliver raw (no agent)
hermes watch add hn \
--provider rss --url https://news.ycombinator.com/rss \
--interval 900 --deliver telegram --deliver-only
# Watch any JSON endpoint
hermes watch add api-changes \
--provider http_json --url https://api.example.com/events \
--arg id_field=event_id --arg items_path=data.events \
--interval 60
```
Watchers piggyback on the cron scheduler's tick loop — if cron is running (gateway or `hermes cron tick`), watchers run automatically.
## How dedup works
The first poll of a new watcher **records a baseline** and does NOT emit any events. This prevents the first tick from replaying the entire feed. Only items that appear after the baseline are delivered.
Each watcher keeps a **watermark file** at `~/.hermes/watchers/<name>.watermark.json` containing a bounded set of seen IDs. To force a replay (treat the next poll as first-run), use `hermes watch reset <name>`.
## Providers
### `http_json`
Polls a JSON endpoint and dedups by a configurable ID field.
| Config key | Default | Purpose |
|---|---|---|
| `url` | *(required)* | Endpoint to GET |
| `id_field` | `id` | Field used to dedup items |
| `items_path` | *(empty)* | Dotted path to the list (`data.events` etc.) if the response isn't a top-level list |
| `headers` | `{}` | Dict of HTTP headers |
| `max_seen` | `500` | Cap on the watermark ID set |
| `timeout` | `20.0` | Request timeout (seconds) |
### `rss`
Parses RSS 2.0 or Atom feeds. Dedups by `<guid>` / `<id>`.
| Config key | Default | Purpose |
|---|---|---|
| `url` | *(required)* | Feed URL |
| `headers` | `{}` | Optional HTTP headers |
| `max_seen` | `500` | Cap on the watermark GUID set |
### `github`
Polls GitHub's REST API. Auth via `GITHUB_TOKEN` / `GH_TOKEN` environment variable (or `token` in config).
| Config key | Default | Purpose |
|---|---|---|
| `repo` | — | `owner/name` (one of `repo` or `search` is required) |
| `scope` | `issues` | `issues` / `pulls` / `releases` / `commits` |
| `search` | — | GitHub issues search query (alternative to `repo`) |
| `per_page` | `30` | Results per page |
## Delivery
Uses the same delivery plumbing as cron — including the new `multi` / `all` routing intents. See [cron delivery options](./cron#delivery-options) for the full list of targets.
- `--deliver-only` sends the rendered prompt verbatim, skipping the agent (zero LLM cost). Good for digests / notifications.
- Without `--deliver-only`, the prompt is handed to a short-lived agent (skills optional via `--skills`), and the agent's final response is delivered.
## Prompt template
`--prompt` supports three placeholders (unknown ones pass through verbatim):
- `{name}` — the watcher name
- `{count}` — number of new items
- `{items_json}` — JSON array of new items
If `--prompt` is omitted, a default like `"{name}: {count} new event(s) from the {provider} watcher.\n\nItems:\n{items_json}"` is used.
## Custom providers
Plugins can register additional providers at import time:
```python
from watchers.providers import register
def my_provider(config, watermark):
# Fetch, dedup, return (new_items, new_watermark).
return [], watermark
register("my_provider", my_provider)
```
## Subcommands
| Command | Purpose |
|---|---|
| `hermes watch add <name> --provider <p> ...` | Create or update a watcher |
| `hermes watch list [--verbose]` | Show all watchers and their status |
| `hermes watch remove <name>` | Delete a watcher + its watermark |
| `hermes watch run <name>` | Fire one watcher out of band (respects dedup) |
| `hermes watch reset <name>` | Clear the watermark — next run treats it as first poll |
| `hermes watch tick` | Poll every due watcher now |

View File

@@ -63,6 +63,7 @@ const sidebars: SidebarsConfig = {
label: 'Automation',
items: [
'user-guide/features/cron',
'user-guide/features/watchers',
'user-guide/features/delegation',
'user-guide/features/kanban',
'user-guide/features/kanban-tutorial',