mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-11 12:48:54 +08:00
Compare commits
1 Commits
bb/coding-
...
feat/watch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3acfeda03 |
@@ -1724,6 +1724,20 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
except Exception as _e:
|
||||
logger.debug("Post-tick MCP orphan cleanup failed: %s", _e)
|
||||
|
||||
# Piggyback watcher tick — runs after cron jobs so cron stays the
|
||||
# priority path. Watchers have their own file lock and are safe to
|
||||
# invoke from any tick. Best-effort: if the watcher subsystem is
|
||||
# missing (e.g. mid-migration), cron keeps working.
|
||||
try:
|
||||
from watchers.engine import tick as _watcher_tick
|
||||
_watcher_outcomes = _watcher_tick(adapters=adapters, loop=loop, verbose=verbose)
|
||||
if verbose and _watcher_outcomes:
|
||||
logger.info("%d watcher(s) polled", len(_watcher_outcomes))
|
||||
except ImportError:
|
||||
pass
|
||||
except Exception as _wexc:
|
||||
logger.warning("Watcher tick failed (non-fatal): %s", _wexc)
|
||||
|
||||
return sum(_results)
|
||||
finally:
|
||||
if fcntl:
|
||||
|
||||
@@ -5239,12 +5239,19 @@ def cmd_cron(args):
|
||||
|
||||
|
||||
def cmd_webhook(args):
|
||||
"""Webhook subscription management."""
|
||||
"""Entry point for 'hermes webhook' command."""
|
||||
from hermes_cli.webhook import webhook_command
|
||||
|
||||
webhook_command(args)
|
||||
|
||||
|
||||
def cmd_watch(args):
|
||||
"""Entry point for 'hermes watch' command."""
|
||||
from hermes_cli.watchers import watch_command
|
||||
|
||||
watch_command(args)
|
||||
|
||||
|
||||
def cmd_slack(args):
|
||||
"""Slack integration helpers.
|
||||
|
||||
@@ -8070,6 +8077,7 @@ def _coalesce_session_name_args(argv: list) -> list:
|
||||
"plugins",
|
||||
"acp",
|
||||
"webhook",
|
||||
"watch",
|
||||
"memory",
|
||||
"dump",
|
||||
"debug",
|
||||
@@ -9265,6 +9273,83 @@ def main():
|
||||
|
||||
webhook_parser.set_defaults(func=cmd_webhook)
|
||||
|
||||
# =========================================================================
|
||||
# watch command — interval-polling watchers (pull-based sibling of webhook)
|
||||
# =========================================================================
|
||||
watch_parser = subparsers.add_parser(
|
||||
"watch",
|
||||
help="Manage interval-polling watchers (HTTP JSON, RSS, GitHub, ...)",
|
||||
description=(
|
||||
"Watchers poll an external source on an interval, detect new items via "
|
||||
"watermark-based dedup, and deliver the result (verbatim in --deliver-only "
|
||||
"mode, or as prompt context to a short-lived agent otherwise)."
|
||||
),
|
||||
)
|
||||
watch_subparsers = watch_parser.add_subparsers(dest="watch_action")
|
||||
|
||||
w_add = watch_subparsers.add_parser(
|
||||
"add", aliases=["subscribe"], help="Create or update a watcher"
|
||||
)
|
||||
w_add.add_argument("name", help="Watcher name (lowercase alphanumerics + '-'/'_')")
|
||||
w_add.add_argument(
|
||||
"--provider",
|
||||
required=True,
|
||||
help="Provider name: http_json | rss | github | <custom>",
|
||||
)
|
||||
w_add.add_argument(
|
||||
"--interval", type=int, default=300, help="Poll interval in seconds (default: 300)"
|
||||
)
|
||||
w_add.add_argument("--prompt", default="", help="Prompt template (supports {items_json}, {name}, {count})")
|
||||
w_add.add_argument("--skills", default="", help="Comma-separated skill names to load")
|
||||
w_add.add_argument(
|
||||
"--deliver",
|
||||
default="origin",
|
||||
help="Delivery target: origin | local | multi | all | telegram | telegram:-100:17 | ... (see 'hermes cron' docs)",
|
||||
)
|
||||
w_add.add_argument(
|
||||
"--deliver-only",
|
||||
action="store_true",
|
||||
help="Skip the agent — deliver the rendered prompt directly. Zero LLM cost.",
|
||||
)
|
||||
w_add.add_argument("--url", default="", help="Shortcut for --arg url=<URL> (http_json / rss)")
|
||||
w_add.add_argument("--repo", default="", help="Shortcut for --arg repo=owner/name (github)")
|
||||
w_add.add_argument(
|
||||
"--scope",
|
||||
default="",
|
||||
help="github scope: issues | pulls | releases | commits (default: issues)",
|
||||
)
|
||||
w_add.add_argument(
|
||||
"--arg",
|
||||
action="append",
|
||||
default=[],
|
||||
help="Additional provider config as key=value (repeatable)",
|
||||
)
|
||||
w_add.add_argument(
|
||||
"--config",
|
||||
default="",
|
||||
help="Full provider config as a JSON object (merged with --arg)",
|
||||
)
|
||||
|
||||
w_list = watch_subparsers.add_parser("list", aliases=["ls"], help="List all watchers")
|
||||
w_list.add_argument("--verbose", action="store_true", help="Include last_error detail")
|
||||
|
||||
w_rm = watch_subparsers.add_parser("remove", aliases=["rm"], help="Remove a watcher")
|
||||
w_rm.add_argument("name", help="Watcher name to remove")
|
||||
|
||||
w_run = watch_subparsers.add_parser(
|
||||
"run", help="Fire one watcher once, out of band (respects dedup)"
|
||||
)
|
||||
w_run.add_argument("name", help="Watcher name to run")
|
||||
|
||||
w_reset = watch_subparsers.add_parser(
|
||||
"reset", help="Clear a watcher's watermark — next run treats it as a first poll"
|
||||
)
|
||||
w_reset.add_argument("name", help="Watcher name to reset")
|
||||
|
||||
watch_subparsers.add_parser("tick", help="Poll every due watcher (ad-hoc)")
|
||||
|
||||
watch_parser.set_defaults(func=cmd_watch)
|
||||
|
||||
# =========================================================================
|
||||
# kanban command — multi-profile collaboration board
|
||||
# =========================================================================
|
||||
|
||||
215
hermes_cli/watchers.py
Normal file
215
hermes_cli/watchers.py
Normal file
@@ -0,0 +1,215 @@
|
||||
"""hermes watch — manage polling watchers from the CLI.
|
||||
|
||||
Usage:
|
||||
hermes watch add <name> --provider <p> --url <url> [--interval 300] ...
|
||||
hermes watch list
|
||||
hermes watch remove <name>
|
||||
hermes watch run <name> # fire the watcher once, out of band
|
||||
hermes watch reset <name> # clear the watermark; next run replays all
|
||||
|
||||
Watchers poll an external source on an interval, detect new items via
|
||||
watermark-based dedup, and deliver the result (verbatim or via agent).
|
||||
Subscriptions persist to ~/.hermes/watchers.json.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from hermes_constants import display_hermes_home
|
||||
from watchers.engine import run_watcher, tick
|
||||
from watchers.providers import PROVIDERS
|
||||
from watchers.store import (
|
||||
WatcherSubscription,
|
||||
delete_watcher,
|
||||
get_watcher,
|
||||
list_watchers,
|
||||
load_watermark,
|
||||
save_watcher,
|
||||
save_watermark,
|
||||
)
|
||||
|
||||
|
||||
_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
|
||||
|
||||
|
||||
def _parse_kv_pairs(items: List[str], *, label: str) -> Dict[str, str]:
|
||||
"""Parse a list of ``k=v`` strings into a dict; errors out cleanly."""
|
||||
out: Dict[str, str] = {}
|
||||
for item in items or []:
|
||||
if "=" not in item:
|
||||
raise ValueError(
|
||||
f"--{label} expects key=value pairs; got {item!r}"
|
||||
)
|
||||
k, v = item.split("=", 1)
|
||||
out[k.strip()] = v.strip()
|
||||
return out
|
||||
|
||||
|
||||
def _parse_config_json(raw: str) -> Dict[str, Any]:
|
||||
raw = (raw or "").strip()
|
||||
if not raw:
|
||||
return {}
|
||||
try:
|
||||
parsed = json.loads(raw)
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"--config must be valid JSON: {e}") from e
|
||||
if not isinstance(parsed, dict):
|
||||
raise ValueError(f"--config must be a JSON object; got {type(parsed).__name__}")
|
||||
return parsed
|
||||
|
||||
|
||||
def watch_command(args) -> None:
|
||||
"""Entry point for 'hermes watch' subcommand."""
|
||||
sub = getattr(args, "watch_action", None)
|
||||
|
||||
if not sub:
|
||||
print("Usage: hermes watch {add|list|remove|run|reset}")
|
||||
print("Run 'hermes watch --help' for details.")
|
||||
return
|
||||
|
||||
if sub in ("add", "subscribe"):
|
||||
_cmd_add(args)
|
||||
elif sub in ("list", "ls"):
|
||||
_cmd_list(args)
|
||||
elif sub in ("remove", "rm"):
|
||||
_cmd_remove(args)
|
||||
elif sub == "run":
|
||||
_cmd_run(args)
|
||||
elif sub == "reset":
|
||||
_cmd_reset(args)
|
||||
elif sub == "tick":
|
||||
_cmd_tick(args)
|
||||
else:
|
||||
print(f"Unknown watch subcommand: {sub}")
|
||||
|
||||
|
||||
def _cmd_add(args) -> None:
|
||||
name = (args.name or "").strip().lower().replace(" ", "-")
|
||||
if not _NAME_RE.match(name):
|
||||
print(f"Error: name must be lowercase alphanumerics + '-'/'_' (got {args.name!r})")
|
||||
return
|
||||
|
||||
provider = (args.provider or "").lower()
|
||||
if provider not in PROVIDERS:
|
||||
print(f"Error: unknown provider {args.provider!r}.")
|
||||
print(f" Known providers: {sorted(PROVIDERS)}")
|
||||
return
|
||||
|
||||
# Config assembly: --config JSON + individual --arg flags merged.
|
||||
try:
|
||||
cfg = _parse_config_json(getattr(args, "config", "") or "")
|
||||
cfg.update(_parse_kv_pairs(getattr(args, "arg", None) or [], label="arg"))
|
||||
except ValueError as e:
|
||||
print(f"Error: {e}")
|
||||
return
|
||||
|
||||
# Convenience flags — providers that commonly need these get dedicated args.
|
||||
if getattr(args, "url", None):
|
||||
cfg.setdefault("url", args.url)
|
||||
if getattr(args, "repo", None):
|
||||
cfg.setdefault("repo", args.repo)
|
||||
if getattr(args, "scope", None):
|
||||
cfg.setdefault("scope", args.scope)
|
||||
|
||||
existing = get_watcher(name)
|
||||
sub = WatcherSubscription(
|
||||
name=name,
|
||||
provider=provider,
|
||||
config=cfg,
|
||||
interval_seconds=int(args.interval),
|
||||
prompt=args.prompt or "",
|
||||
skills=[s.strip() for s in (args.skills or "").split(",") if s.strip()],
|
||||
deliver=args.deliver or "origin",
|
||||
deliver_only=bool(args.deliver_only),
|
||||
enabled=True,
|
||||
created_at=(existing.created_at if existing else time.time()),
|
||||
)
|
||||
save_watcher(sub)
|
||||
|
||||
verb = "Updated" if existing else "Added"
|
||||
print(f"{verb} watcher '{name}':")
|
||||
print(f" provider = {sub.provider}")
|
||||
print(f" interval = {sub.interval_seconds}s")
|
||||
print(f" deliver = {sub.deliver}")
|
||||
print(f" deliver_only = {sub.deliver_only}")
|
||||
print(f" config = {json.dumps(sub.config, indent=2)}")
|
||||
if not existing:
|
||||
print()
|
||||
print(f"The watermark file will be created at {display_hermes_home()}/watchers/{name}.watermark.json")
|
||||
print("on first successful poll. The first poll records baseline state and")
|
||||
print("does NOT fire; only items that appear AFTER the baseline are delivered.")
|
||||
|
||||
|
||||
def _cmd_list(args) -> None:
|
||||
subs = list_watchers()
|
||||
if not subs:
|
||||
print("No watchers registered.")
|
||||
print()
|
||||
print("Add one with: hermes watch add <name> --provider <p> --url <url>")
|
||||
return
|
||||
|
||||
print(f"{'NAME':<20} {'PROVIDER':<12} {'INTERVAL':<10} {'LAST RUN':<21} {'EVENTS':<7} {'STATUS'}")
|
||||
for sub in sorted(subs, key=lambda s: s.name):
|
||||
last_run = (
|
||||
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(sub.last_run_at))
|
||||
if sub.last_run_at
|
||||
else "never"
|
||||
)
|
||||
status = "disabled" if not sub.enabled else ("error" if sub.last_error else "ok")
|
||||
print(
|
||||
f"{sub.name:<20} {sub.provider:<12} {sub.interval_seconds!s:<10} "
|
||||
f"{last_run:<21} {sub.last_event_count:<7} {status}"
|
||||
)
|
||||
if sub.last_error and getattr(args, "verbose", False):
|
||||
print(f" ↳ last_error: {sub.last_error}")
|
||||
|
||||
|
||||
def _cmd_remove(args) -> None:
|
||||
name = args.name.strip().lower()
|
||||
if delete_watcher(name):
|
||||
print(f"Removed watcher '{name}' and cleared its watermark.")
|
||||
else:
|
||||
print(f"No watcher named '{name}'.")
|
||||
|
||||
|
||||
def _cmd_run(args) -> None:
|
||||
name = args.name.strip().lower()
|
||||
sub = get_watcher(name)
|
||||
if not sub:
|
||||
print(f"No watcher named '{name}'.")
|
||||
return
|
||||
print(f"Polling watcher '{name}' ({sub.provider})...")
|
||||
outcome = run_watcher(sub)
|
||||
sub.last_run_at = time.time()
|
||||
sub.last_error = outcome.get("error")
|
||||
sub.last_event_count = outcome.get("new_events", 0)
|
||||
save_watcher(sub)
|
||||
print(f" status = {outcome.get('status')}")
|
||||
print(f" new events = {outcome.get('new_events')}")
|
||||
if outcome.get("error"):
|
||||
print(f" error = {outcome['error']}")
|
||||
|
||||
|
||||
def _cmd_reset(args) -> None:
|
||||
name = args.name.strip().lower()
|
||||
if not get_watcher(name):
|
||||
print(f"No watcher named '{name}'.")
|
||||
return
|
||||
save_watermark(name, {})
|
||||
print(f"Cleared watermark for '{name}'. Next poll will treat it as a first run.")
|
||||
|
||||
|
||||
def _cmd_tick(args) -> None:
|
||||
"""Fire every due watcher once and print outcomes (CLI-only ad-hoc poll)."""
|
||||
outcomes = tick(verbose=True)
|
||||
if not outcomes:
|
||||
print("No watchers due.")
|
||||
return
|
||||
for o in outcomes:
|
||||
flag = "!" if o["status"] != "ok" else " "
|
||||
err = f" [{o['error']}]" if o.get("error") else ""
|
||||
print(f" {flag} {o['name']:<20} {o['status']:<8} events={o['new_events']}{err}")
|
||||
1
tests/watchers/__init__.py
Normal file
1
tests/watchers/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Marker file so pytest discovers the watchers test package."""
|
||||
232
tests/watchers/test_engine.py
Normal file
232
tests/watchers/test_engine.py
Normal file
@@ -0,0 +1,232 @@
|
||||
"""Tests for watchers/engine.py — due-check, prompt rendering, run_watcher outcomes.
|
||||
|
||||
These avoid touching the real AIAgent / cron delivery — we patch them so the
|
||||
engine logic (dedup, baseline, error capture) is isolated.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from watchers.engine import _is_due, _render_prompt, run_watcher, tick
|
||||
from watchers.providers import PROVIDERS, ProviderError
|
||||
from watchers.store import WatcherSubscription, save_watcher
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def watcher_home(tmp_path, monkeypatch):
|
||||
home = tmp_path / ".hermes"
|
||||
home.mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
import importlib
|
||||
import hermes_constants
|
||||
|
||||
importlib.reload(hermes_constants)
|
||||
return home
|
||||
|
||||
|
||||
class TestIsDue:
|
||||
def test_new_watcher_is_always_due(self):
|
||||
sub = WatcherSubscription(name="n", provider="http_json", interval_seconds=60)
|
||||
assert _is_due(sub) is True
|
||||
|
||||
def test_recent_run_is_not_due(self):
|
||||
sub = WatcherSubscription(
|
||||
name="n",
|
||||
provider="http_json",
|
||||
interval_seconds=60,
|
||||
last_run_at=time.time() - 10,
|
||||
)
|
||||
assert _is_due(sub) is False
|
||||
|
||||
def test_old_run_is_due(self):
|
||||
sub = WatcherSubscription(
|
||||
name="n",
|
||||
provider="http_json",
|
||||
interval_seconds=60,
|
||||
last_run_at=time.time() - 120,
|
||||
)
|
||||
assert _is_due(sub) is True
|
||||
|
||||
def test_disabled_never_due(self):
|
||||
sub = WatcherSubscription(
|
||||
name="n",
|
||||
provider="http_json",
|
||||
interval_seconds=60,
|
||||
last_run_at=None,
|
||||
enabled=False,
|
||||
)
|
||||
assert _is_due(sub) is False
|
||||
|
||||
def test_interval_floor_prevents_flooding(self):
|
||||
"""Interval below 5s is clamped to 5s so a bad config can't DDOS."""
|
||||
sub = WatcherSubscription(
|
||||
name="n",
|
||||
provider="http_json",
|
||||
interval_seconds=1,
|
||||
last_run_at=time.time() - 3,
|
||||
)
|
||||
assert _is_due(sub) is False
|
||||
|
||||
|
||||
class TestRenderPrompt:
|
||||
def test_default_prompt_shows_count_and_items(self):
|
||||
sub = WatcherSubscription(name="feed", provider="rss")
|
||||
rendered = _render_prompt("", [{"id": "a"}, {"id": "b"}], sub)
|
||||
assert "feed" in rendered
|
||||
assert "2 new event" in rendered
|
||||
assert '"id": "a"' in rendered
|
||||
|
||||
def test_placeholder_substitution(self):
|
||||
sub = WatcherSubscription(name="pr-watcher", provider="github")
|
||||
template = "Watcher {name} saw {count} new items:\n{items_json}"
|
||||
rendered = _render_prompt(template, [{"x": 1}], sub)
|
||||
assert rendered.startswith("Watcher pr-watcher saw 1 new items:")
|
||||
assert '"x": 1' in rendered
|
||||
|
||||
def test_unknown_placeholders_pass_through(self):
|
||||
"""Users shouldn't be punished for typos with a KeyError."""
|
||||
sub = WatcherSubscription(name="w", provider="http_json")
|
||||
rendered = _render_prompt("hi {unknown} there", [], sub)
|
||||
assert "{unknown}" in rendered
|
||||
|
||||
|
||||
class TestRunWatcher:
|
||||
def test_unknown_provider_returns_error(self, watcher_home):
|
||||
sub = WatcherSubscription(name="bad", provider="does-not-exist")
|
||||
outcome = run_watcher(sub)
|
||||
assert outcome["status"] == "error"
|
||||
assert "Unknown watcher provider" in outcome["error"]
|
||||
|
||||
def test_provider_error_is_captured(self, watcher_home):
|
||||
def failing(_config, _watermark):
|
||||
raise ProviderError("backend down")
|
||||
|
||||
PROVIDERS["unit-failing"] = failing
|
||||
try:
|
||||
sub = WatcherSubscription(name="w", provider="unit-failing", config={})
|
||||
outcome = run_watcher(sub)
|
||||
assert outcome["status"] == "error"
|
||||
assert "backend down" in outcome["error"]
|
||||
finally:
|
||||
PROVIDERS.pop("unit-failing", None)
|
||||
|
||||
def test_empty_delta_produces_no_delivery(self, watcher_home):
|
||||
def noop(_c, _wm):
|
||||
return [], {"seen_ids": ["old"]}
|
||||
|
||||
PROVIDERS["unit-noop"] = noop
|
||||
try:
|
||||
sub = WatcherSubscription(name="w", provider="unit-noop")
|
||||
with patch("watchers.engine._deliver_payload") as deliver_mock, \
|
||||
patch("watchers.engine._run_agent_for_watcher") as agent_mock:
|
||||
outcome = run_watcher(sub)
|
||||
assert outcome["status"] == "ok"
|
||||
assert outcome["new_events"] == 0
|
||||
deliver_mock.assert_not_called()
|
||||
agent_mock.assert_not_called()
|
||||
finally:
|
||||
PROVIDERS.pop("unit-noop", None)
|
||||
|
||||
def test_deliver_only_skips_agent(self, watcher_home):
|
||||
def new_item(_c, _wm):
|
||||
return [{"id": "1", "title": "new thing"}], {"seen_ids": ["1"]}
|
||||
|
||||
PROVIDERS["unit-fresh"] = new_item
|
||||
try:
|
||||
sub = WatcherSubscription(
|
||||
name="w", provider="unit-fresh", deliver="local", deliver_only=True
|
||||
)
|
||||
with patch("watchers.engine._deliver_payload", return_value=None) as deliver_mock, \
|
||||
patch("watchers.engine._run_agent_for_watcher") as agent_mock:
|
||||
outcome = run_watcher(sub)
|
||||
assert outcome["status"] == "ok"
|
||||
assert outcome["new_events"] == 1
|
||||
deliver_mock.assert_called_once()
|
||||
agent_mock.assert_not_called()
|
||||
# The prompt was passed verbatim as the delivery payload.
|
||||
delivered_content = deliver_mock.call_args[0][1]
|
||||
assert "new thing" in delivered_content
|
||||
finally:
|
||||
PROVIDERS.pop("unit-fresh", None)
|
||||
|
||||
def test_agent_mode_invokes_agent_then_delivers_its_response(self, watcher_home):
|
||||
def new_item(_c, _wm):
|
||||
return [{"id": "1"}], {"seen_ids": ["1"]}
|
||||
|
||||
PROVIDERS["unit-fresh2"] = new_item
|
||||
try:
|
||||
sub = WatcherSubscription(name="w", provider="unit-fresh2", deliver="local")
|
||||
with patch(
|
||||
"watchers.engine._run_agent_for_watcher", return_value="Agent's reply"
|
||||
) as agent_mock, patch(
|
||||
"watchers.engine._deliver_payload", return_value=None
|
||||
) as deliver_mock:
|
||||
outcome = run_watcher(sub)
|
||||
agent_mock.assert_called_once()
|
||||
deliver_mock.assert_called_once()
|
||||
assert deliver_mock.call_args[0][1] == "Agent's reply"
|
||||
assert outcome["status"] == "ok"
|
||||
finally:
|
||||
PROVIDERS.pop("unit-fresh2", None)
|
||||
|
||||
|
||||
class TestTickLoop:
|
||||
def test_tick_skips_not_due_watchers(self, watcher_home):
|
||||
"""Watchers whose interval hasn't elapsed are not polled."""
|
||||
sub = WatcherSubscription(
|
||||
name="fresh",
|
||||
provider="http_json",
|
||||
interval_seconds=3600,
|
||||
last_run_at=time.time() - 60,
|
||||
)
|
||||
save_watcher(sub)
|
||||
with patch("watchers.engine.run_watcher") as run_mock:
|
||||
outcomes = tick()
|
||||
run_mock.assert_not_called()
|
||||
assert outcomes == []
|
||||
|
||||
def test_tick_runs_due_watcher_and_persists_last_run_at(self, watcher_home):
|
||||
sub = WatcherSubscription(
|
||||
name="stale",
|
||||
provider="http_json",
|
||||
interval_seconds=60,
|
||||
last_run_at=time.time() - 300,
|
||||
)
|
||||
save_watcher(sub)
|
||||
|
||||
with patch(
|
||||
"watchers.engine.run_watcher",
|
||||
return_value={"name": "stale", "status": "ok", "new_events": 0, "error": None},
|
||||
):
|
||||
outcomes = tick()
|
||||
|
||||
from watchers.store import get_watcher
|
||||
|
||||
refreshed = get_watcher("stale")
|
||||
assert refreshed is not None
|
||||
assert refreshed.last_run_at is not None
|
||||
assert refreshed.last_run_at > time.time() - 5 # just set
|
||||
assert outcomes[0]["name"] == "stale"
|
||||
|
||||
def test_tick_captures_per_watcher_errors_without_crashing(self, watcher_home):
|
||||
"""A crashing watcher doesn't kill the whole tick."""
|
||||
for i in range(3):
|
||||
save_watcher(WatcherSubscription(name=f"w{i}", provider="http_json"))
|
||||
|
||||
def fake_run(sub, *, now=None, adapters=None, loop=None):
|
||||
if sub.name == "w1":
|
||||
return {"name": sub.name, "status": "error", "new_events": 0,
|
||||
"error": "boom"}
|
||||
return {"name": sub.name, "status": "ok", "new_events": 0, "error": None}
|
||||
|
||||
with patch("watchers.engine.run_watcher", side_effect=fake_run):
|
||||
outcomes = tick()
|
||||
|
||||
by_name = {o["name"]: o for o in outcomes}
|
||||
assert by_name["w0"]["status"] == "ok"
|
||||
assert by_name["w1"]["status"] == "error"
|
||||
assert by_name["w2"]["status"] == "ok"
|
||||
221
tests/watchers/test_providers.py
Normal file
221
tests/watchers/test_providers.py
Normal file
@@ -0,0 +1,221 @@
|
||||
"""Tests for watchers/providers.py — watermark dedup, first-run baseline, provider registry."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from watchers.providers import (
|
||||
PROVIDERS,
|
||||
ProviderError,
|
||||
register,
|
||||
resolve_provider,
|
||||
)
|
||||
|
||||
|
||||
class TestProviderRegistry:
|
||||
def test_builtin_providers_registered(self):
|
||||
for name in ("http_json", "rss", "github"):
|
||||
assert name in PROVIDERS
|
||||
assert callable(PROVIDERS[name])
|
||||
|
||||
def test_resolve_provider_case_insensitive(self):
|
||||
assert resolve_provider("HTTP_JSON") is PROVIDERS["http_json"]
|
||||
assert resolve_provider("Http_Json") is PROVIDERS["http_json"]
|
||||
|
||||
def test_resolve_unknown_raises_keyerror(self):
|
||||
with pytest.raises(KeyError, match="Unknown watcher provider"):
|
||||
resolve_provider("does-not-exist")
|
||||
|
||||
def test_register_adds_custom_provider(self):
|
||||
def custom(_config, _watermark):
|
||||
return [], {}
|
||||
|
||||
try:
|
||||
register("unit-test-provider", custom)
|
||||
assert resolve_provider("unit-test-provider") is custom
|
||||
finally:
|
||||
PROVIDERS.pop("unit-test-provider", None)
|
||||
|
||||
|
||||
class TestHttpJsonProvider:
|
||||
"""The http_json provider: list of items → dedup by id_field."""
|
||||
|
||||
def _call(self, response_body, config, watermark):
|
||||
"""Invoke the provider with a mocked HTTP response."""
|
||||
provider = resolve_provider("http_json")
|
||||
with patch(
|
||||
"watchers.providers._http_get",
|
||||
return_value=json.dumps(response_body).encode("utf-8"),
|
||||
):
|
||||
return provider(config, watermark)
|
||||
|
||||
def test_first_run_records_baseline_without_emitting(self):
|
||||
"""First poll of a new watcher must NOT replay the existing feed."""
|
||||
items = [{"id": 1}, {"id": 2}, {"id": 3}]
|
||||
new_items, wm = self._call(items, {"url": "x"}, {})
|
||||
assert new_items == []
|
||||
assert sorted(wm["seen_ids"]) == ["1", "2", "3"]
|
||||
|
||||
def test_subsequent_poll_emits_new_items_only(self):
|
||||
# Baseline recorded from a prior poll.
|
||||
wm = {"seen_ids": ["1", "2", "3"]}
|
||||
items = [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}]
|
||||
new_items, new_wm = self._call(items, {"url": "x"}, wm)
|
||||
assert [i["id"] for i in new_items] == [4, 5]
|
||||
assert "4" in new_wm["seen_ids"]
|
||||
assert "5" in new_wm["seen_ids"]
|
||||
|
||||
def test_idempotent_on_empty_delta(self):
|
||||
wm = {"seen_ids": ["1", "2"]}
|
||||
items = [{"id": 1}, {"id": 2}]
|
||||
new_items, _ = self._call(items, {"url": "x"}, wm)
|
||||
assert new_items == []
|
||||
|
||||
def test_items_path_dotted_lookup(self):
|
||||
body = {"data": {"results": [{"id": 1}, {"id": 2}]}}
|
||||
_, wm = self._call(body, {"url": "x", "items_path": "data.results"}, {})
|
||||
assert sorted(wm["seen_ids"]) == ["1", "2"]
|
||||
|
||||
def test_missing_id_field_skips_item(self):
|
||||
items = [{"id": 1}, {"other": 2}, {"id": 3}]
|
||||
_, wm = self._call(items, {"url": "x"}, {})
|
||||
# Item without id is dropped entirely.
|
||||
assert sorted(wm["seen_ids"]) == ["1", "3"]
|
||||
|
||||
def test_custom_id_field(self):
|
||||
items = [{"uuid": "a"}, {"uuid": "b"}]
|
||||
_, wm = self._call(items, {"url": "x", "id_field": "uuid"}, {})
|
||||
assert sorted(wm["seen_ids"]) == ["a", "b"]
|
||||
|
||||
def test_max_seen_caps_watermark_memory(self):
|
||||
# 600 items with max_seen=100 → only keep last 100 after cap.
|
||||
items = [{"id": i} for i in range(600)]
|
||||
_, wm = self._call(items, {"url": "x", "max_seen": 100}, {})
|
||||
assert len(wm["seen_ids"]) == 100
|
||||
|
||||
def test_raises_provider_error_on_non_list_result(self):
|
||||
provider = resolve_provider("http_json")
|
||||
with patch("watchers.providers._http_get", return_value=b'{"not": "a list"}'):
|
||||
with pytest.raises(ProviderError, match="did not resolve to a list"):
|
||||
provider({"url": "x", "items_path": "not"}, {})
|
||||
|
||||
def test_missing_url_raises_provider_error(self):
|
||||
provider = resolve_provider("http_json")
|
||||
with pytest.raises(ProviderError, match="'url' is required"):
|
||||
provider({}, {})
|
||||
|
||||
def test_invalid_json_raises_provider_error(self):
|
||||
provider = resolve_provider("http_json")
|
||||
with patch("watchers.providers._http_get", return_value=b"not json"):
|
||||
with pytest.raises(ProviderError, match="not valid JSON"):
|
||||
provider({"url": "x"}, {})
|
||||
|
||||
|
||||
class TestRssProvider:
|
||||
SAMPLE_RSS = b"""<?xml version="1.0"?>
|
||||
<rss version="2.0">
|
||||
<channel>
|
||||
<title>Example</title>
|
||||
<item>
|
||||
<title>First post</title>
|
||||
<link>https://example.com/1</link>
|
||||
<guid>post-1</guid>
|
||||
<description>Hello</description>
|
||||
<pubDate>Mon, 05 May 2026 10:00:00 GMT</pubDate>
|
||||
</item>
|
||||
<item>
|
||||
<title>Second post</title>
|
||||
<link>https://example.com/2</link>
|
||||
<guid>post-2</guid>
|
||||
<description>World</description>
|
||||
<pubDate>Tue, 06 May 2026 10:00:00 GMT</pubDate>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>
|
||||
"""
|
||||
|
||||
SAMPLE_ATOM = b"""<?xml version="1.0"?>
|
||||
<feed xmlns="http://www.w3.org/2005/Atom">
|
||||
<entry>
|
||||
<id>atom-1</id>
|
||||
<title>Atom One</title>
|
||||
<link href="https://example.com/a1"/>
|
||||
<summary>Atom summary</summary>
|
||||
<updated>2026-05-07T12:00:00Z</updated>
|
||||
</entry>
|
||||
</feed>
|
||||
"""
|
||||
|
||||
def test_rss_first_run_baseline(self):
|
||||
provider = resolve_provider("rss")
|
||||
with patch("watchers.providers._http_get", return_value=self.SAMPLE_RSS):
|
||||
new_items, wm = provider({"url": "http://f.example/feed"}, {})
|
||||
assert new_items == []
|
||||
assert "post-1" in wm["seen_guids"]
|
||||
assert "post-2" in wm["seen_guids"]
|
||||
|
||||
def test_rss_subsequent_run_emits_only_new(self):
|
||||
provider = resolve_provider("rss")
|
||||
wm = {"seen_guids": ["post-1"]} # only post-1 seen previously
|
||||
with patch("watchers.providers._http_get", return_value=self.SAMPLE_RSS):
|
||||
new_items, new_wm = provider({"url": "http://f.example/feed"}, wm)
|
||||
assert len(new_items) == 1
|
||||
assert new_items[0]["id"] == "post-2"
|
||||
assert new_items[0]["title"] == "Second post"
|
||||
assert new_items[0]["url"] == "https://example.com/2"
|
||||
|
||||
def test_atom_format_parses(self):
|
||||
provider = resolve_provider("rss")
|
||||
with patch("watchers.providers._http_get", return_value=self.SAMPLE_ATOM):
|
||||
_, wm = provider({"url": "http://f/atom"}, {})
|
||||
assert "atom-1" in wm["seen_guids"]
|
||||
|
||||
def test_invalid_xml_raises_provider_error(self):
|
||||
provider = resolve_provider("rss")
|
||||
with patch("watchers.providers._http_get", return_value=b"<not valid"):
|
||||
with pytest.raises(ProviderError, match="invalid XML"):
|
||||
provider({"url": "x"}, {})
|
||||
|
||||
|
||||
class TestGithubProvider:
|
||||
def test_rejects_invalid_repo_format(self):
|
||||
provider = resolve_provider("github")
|
||||
with pytest.raises(ProviderError, match="must be 'owner/name'"):
|
||||
provider({"repo": "no-slash-here"}, {})
|
||||
|
||||
def test_rejects_unknown_scope(self):
|
||||
provider = resolve_provider("github")
|
||||
with pytest.raises(ProviderError, match="scope must be one of"):
|
||||
provider({"repo": "a/b", "scope": "banana"}, {})
|
||||
|
||||
def test_requires_repo_or_search(self):
|
||||
provider = resolve_provider("github")
|
||||
with pytest.raises(ProviderError, match="'repo' or 'search' is required"):
|
||||
provider({}, {})
|
||||
|
||||
def test_github_dedups_by_id_and_baselines_on_first_run(self):
|
||||
provider = resolve_provider("github")
|
||||
fake_issues = [
|
||||
{"id": 100, "number": 1, "title": "A", "html_url": "u1", "state": "open",
|
||||
"user": {"login": "alice"}, "created_at": "t", "body": "..."},
|
||||
{"id": 200, "number": 2, "title": "B", "html_url": "u2", "state": "closed",
|
||||
"user": {"login": "bob"}, "created_at": "t", "body": "..."},
|
||||
]
|
||||
with patch("watchers.providers._http_get", return_value=json.dumps(fake_issues).encode()):
|
||||
new_items, wm = provider({"repo": "a/b", "scope": "issues"}, {})
|
||||
assert new_items == []
|
||||
assert sorted(wm["seen_ids"]) == ["100", "200"]
|
||||
|
||||
# Next poll with a new issue on top.
|
||||
fake_issues.insert(0, {
|
||||
"id": 300, "number": 3, "title": "C", "html_url": "u3", "state": "open",
|
||||
"user": {"login": "carol"}, "created_at": "t", "body": "fresh"
|
||||
})
|
||||
with patch("watchers.providers._http_get", return_value=json.dumps(fake_issues).encode()):
|
||||
new_items, _ = provider({"repo": "a/b", "scope": "issues"}, wm)
|
||||
assert len(new_items) == 1
|
||||
assert new_items[0]["id"] == "300"
|
||||
assert new_items[0]["author"] == "carol"
|
||||
103
tests/watchers/test_store.py
Normal file
103
tests/watchers/test_store.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Tests for watchers/store.py — subscription CRUD + watermark persistence."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from watchers.store import (
|
||||
WatcherSubscription,
|
||||
delete_watcher,
|
||||
get_watcher,
|
||||
list_watchers,
|
||||
load_watermark,
|
||||
save_watcher,
|
||||
save_watermark,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def watcher_home(tmp_path, monkeypatch):
|
||||
"""Isolated HERMES_HOME so tests don't stomp user data."""
|
||||
home = tmp_path / ".hermes"
|
||||
home.mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
# Ensure any cached hermes_constants path is recomputed.
|
||||
import importlib
|
||||
|
||||
import hermes_constants
|
||||
|
||||
importlib.reload(hermes_constants)
|
||||
return home
|
||||
|
||||
|
||||
class TestWatcherSubscription:
|
||||
def test_from_dict_fills_defaults_for_missing_keys(self):
|
||||
sub = WatcherSubscription.from_dict({"name": "x", "provider": "http_json"})
|
||||
assert sub.name == "x"
|
||||
assert sub.provider == "http_json"
|
||||
assert sub.interval_seconds == 300
|
||||
assert sub.deliver == "origin"
|
||||
assert sub.deliver_only is False
|
||||
assert sub.enabled is True
|
||||
assert sub.last_run_at is None
|
||||
|
||||
def test_round_trip_preserves_fields(self, watcher_home):
|
||||
sub = WatcherSubscription(
|
||||
name="github-issues",
|
||||
provider="github",
|
||||
config={"repo": "foo/bar", "scope": "issues"},
|
||||
interval_seconds=120,
|
||||
prompt="New: {items_json}",
|
||||
skills=["a", "b"],
|
||||
deliver="telegram,discord",
|
||||
deliver_only=True,
|
||||
)
|
||||
save_watcher(sub)
|
||||
fetched = get_watcher("github-issues")
|
||||
assert fetched is not None
|
||||
assert fetched.name == "github-issues"
|
||||
assert fetched.provider == "github"
|
||||
assert fetched.config == {"repo": "foo/bar", "scope": "issues"}
|
||||
assert fetched.interval_seconds == 120
|
||||
assert fetched.skills == ["a", "b"]
|
||||
assert fetched.deliver == "telegram,discord"
|
||||
assert fetched.deliver_only is True
|
||||
|
||||
def test_delete_clears_watermark(self, watcher_home):
|
||||
sub = WatcherSubscription(name="w", provider="http_json", config={"url": "x"})
|
||||
save_watcher(sub)
|
||||
save_watermark("w", {"seen_ids": ["1", "2"]})
|
||||
assert (watcher_home / "watchers" / "w.watermark.json").exists()
|
||||
|
||||
assert delete_watcher("w") is True
|
||||
assert get_watcher("w") is None
|
||||
assert not (watcher_home / "watchers" / "w.watermark.json").exists()
|
||||
|
||||
def test_delete_nonexistent_returns_false(self, watcher_home):
|
||||
assert delete_watcher("nope") is False
|
||||
|
||||
def test_list_watchers_returns_all(self, watcher_home):
|
||||
for i in range(3):
|
||||
save_watcher(
|
||||
WatcherSubscription(name=f"w{i}", provider="http_json", config={"url": f"u{i}"})
|
||||
)
|
||||
names = sorted(w.name for w in list_watchers())
|
||||
assert names == ["w0", "w1", "w2"]
|
||||
|
||||
def test_watermark_is_opaque_dict_preserved_verbatim(self, watcher_home):
|
||||
Path(watcher_home / "watchers").mkdir(parents=True, exist_ok=True)
|
||||
wm = {"seen_ids": ["a", "b"], "last_polled_at": 12345.678, "custom": {"nested": True}}
|
||||
save_watermark("test", wm)
|
||||
assert load_watermark("test") == wm
|
||||
|
||||
def test_load_watermark_returns_empty_when_missing(self, watcher_home):
|
||||
assert load_watermark("nonexistent") == {}
|
||||
|
||||
def test_subscriptions_file_is_valid_json(self, watcher_home):
|
||||
save_watcher(WatcherSubscription(name="a", provider="http_json", config={"url": "x"}))
|
||||
raw = (watcher_home / "watchers.json").read_text(encoding="utf-8")
|
||||
parsed = json.loads(raw)
|
||||
assert "a" in parsed
|
||||
55
watchers/__init__.py
Normal file
55
watchers/__init__.py
Normal file
@@ -0,0 +1,55 @@
|
||||
"""Watchers — interval-polling with watermark dedup, inspired by Vellum Assistant's watcher system.
|
||||
|
||||
A watcher periodically fetches data from an external source, compares new items
|
||||
against a stored watermark, and — if new items exist — hands them to the agent
|
||||
(or delivers them verbatim, in no-agent mode) as the prompt context.
|
||||
|
||||
Watchers are the **pull-based** sibling of webhooks. Webhooks: source pushes →
|
||||
agent reacts. Watchers: scheduler pulls on an interval → agent reacts to what's
|
||||
new. Cron is the time-based sibling: scheduler fires → agent runs a prompt,
|
||||
regardless of whether anything changed.
|
||||
|
||||
Key design choices:
|
||||
- Subscriptions live in ``~/.hermes/watchers.json`` (parallels webhooks).
|
||||
- Watermarks live in ``~/.hermes/watchers/<name>.watermark.json`` so provider
|
||||
state can be inspected / reset per-watcher without touching the subscription
|
||||
file.
|
||||
- Providers implement ``fetch_new(config, watermark) -> (items, new_watermark)``
|
||||
and are kept deliberately stateless so the scheduler can call them from any
|
||||
thread.
|
||||
- Delivery reuses the cron ``deliver`` plumbing: ``local`` / ``origin`` /
|
||||
platform targets / ``multi`` / ``all``.
|
||||
|
||||
Ship-with providers: ``http_json``, ``rss``, ``github``. Custom providers can
|
||||
be registered via ``watchers.providers.register()``.
|
||||
"""
|
||||
|
||||
from watchers.providers import ( # noqa: F401
|
||||
PROVIDERS,
|
||||
ProviderError,
|
||||
register,
|
||||
resolve_provider,
|
||||
)
|
||||
from watchers.store import ( # noqa: F401
|
||||
WatcherSubscription,
|
||||
delete_watcher,
|
||||
get_watcher,
|
||||
list_watchers,
|
||||
load_watermark,
|
||||
save_watcher,
|
||||
save_watermark,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"WatcherSubscription",
|
||||
"PROVIDERS",
|
||||
"ProviderError",
|
||||
"delete_watcher",
|
||||
"get_watcher",
|
||||
"list_watchers",
|
||||
"load_watermark",
|
||||
"register",
|
||||
"resolve_provider",
|
||||
"save_watcher",
|
||||
"save_watermark",
|
||||
]
|
||||
290
watchers/engine.py
Normal file
290
watchers/engine.py
Normal file
@@ -0,0 +1,290 @@
|
||||
"""Watcher engine — the tick-time poller that runs every watcher whose
|
||||
interval has elapsed, resolves any new items, and delivers them (verbatim
|
||||
in ``deliver_only`` mode, or as prompt context to a short-lived agent).
|
||||
|
||||
Safe to call from the cron scheduler tick loop: guarded by its own file
|
||||
lock so concurrent ticks (gateway in-process + standalone daemon) don't
|
||||
double-fire.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from watchers.providers import ProviderError, resolve_provider
|
||||
from watchers.store import (
|
||||
WatcherSubscription,
|
||||
list_watchers,
|
||||
load_watermark,
|
||||
save_watcher,
|
||||
save_watermark,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import fcntl # type: ignore
|
||||
except ImportError: # pragma: no cover - Windows fallback
|
||||
fcntl = None # type: ignore
|
||||
try:
|
||||
import msvcrt # type: ignore
|
||||
except ImportError: # pragma: no cover - Unix
|
||||
msvcrt = None # type: ignore
|
||||
|
||||
|
||||
def _lock_path() -> Path:
|
||||
d = get_hermes_home() / "watchers"
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
return d / ".tick.lock"
|
||||
|
||||
|
||||
def _is_due(sub: WatcherSubscription, *, now: Optional[float] = None) -> bool:
|
||||
"""Has enough time elapsed since the last run?"""
|
||||
if not sub.enabled:
|
||||
return False
|
||||
now = now or time.time()
|
||||
if sub.last_run_at is None:
|
||||
return True
|
||||
return (now - sub.last_run_at) >= max(5, int(sub.interval_seconds))
|
||||
|
||||
|
||||
def _render_prompt(template: str, items: List[Dict[str, Any]], sub: WatcherSubscription) -> str:
|
||||
"""Render the watcher prompt template with the new items.
|
||||
|
||||
If ``template`` is empty, a sensible default is produced so the user
|
||||
doesn't have to write a prompt for a quick ``hermes watch add``.
|
||||
``{items_json}`` / ``{name}`` / ``{count}`` are recognized placeholders;
|
||||
unknown placeholders are left verbatim so they don't raise at render
|
||||
time.
|
||||
"""
|
||||
payload = json.dumps(items, indent=2, default=str)
|
||||
if not template:
|
||||
return (
|
||||
f"{sub.name}: {len(items)} new event(s) from the {sub.provider} watcher.\n\n"
|
||||
f"Items:\n{payload}"
|
||||
)
|
||||
|
||||
placeholders = {
|
||||
"items_json": payload,
|
||||
"name": sub.name,
|
||||
"count": str(len(items)),
|
||||
}
|
||||
|
||||
def _replace(match: "re.Match[str]") -> str: # type: ignore[name-defined]
|
||||
key = match.group(1)
|
||||
return placeholders.get(key, match.group(0))
|
||||
|
||||
import re
|
||||
|
||||
return re.sub(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}", _replace, template)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Delivery — re-uses cron delivery plumbing so deliver="multi" etc. work
|
||||
# identically. We piggyback on cron's ``_deliver_result`` by staging a
|
||||
# synthetic "job" dict that looks like a no-agent cron job.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _deliver_payload(
|
||||
sub: WatcherSubscription,
|
||||
content: str,
|
||||
*,
|
||||
adapters: Optional[Dict[str, Any]] = None,
|
||||
loop: Any = None,
|
||||
) -> Optional[str]:
|
||||
"""Deliver ``content`` using the cron delivery plumbing.
|
||||
|
||||
Returns None on success or an error string.
|
||||
"""
|
||||
try:
|
||||
from cron.scheduler import _deliver_result
|
||||
except ImportError as e:
|
||||
return f"cron delivery unavailable: {e}"
|
||||
|
||||
synthetic_job = {
|
||||
"id": f"watcher:{sub.name}",
|
||||
"name": f"watch/{sub.name}",
|
||||
"deliver": sub.deliver,
|
||||
"origin": None,
|
||||
}
|
||||
try:
|
||||
return _deliver_result(synthetic_job, content, adapters=adapters, loop=loop)
|
||||
except Exception as e:
|
||||
logger.exception("Watcher %s: delivery crashed: %s", sub.name, e)
|
||||
return f"delivery crashed: {e}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent dispatch — when deliver_only is False, we hand the items to a
|
||||
# short-lived AIAgent (same pattern cron uses for agent-mode jobs). In
|
||||
# deliver_only mode, the rendered prompt is sent verbatim as the message.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _run_agent_for_watcher(
|
||||
sub: WatcherSubscription,
|
||||
prompt: str,
|
||||
*,
|
||||
adapters: Optional[Dict[str, Any]] = None,
|
||||
loop: Any = None,
|
||||
) -> str:
|
||||
"""Run a one-shot agent session for the watcher and return its final response.
|
||||
|
||||
Errors are captured and returned as error strings so the tick loop doesn't
|
||||
crash on one misbehaving watcher.
|
||||
"""
|
||||
try:
|
||||
from run_agent import AIAgent
|
||||
except ImportError as e:
|
||||
return f"[watcher error: AIAgent unavailable: {e}]"
|
||||
|
||||
try:
|
||||
agent = AIAgent(
|
||||
quiet_mode=True,
|
||||
platform="watcher",
|
||||
session_id=f"watcher-{sub.name}-{int(time.time())}",
|
||||
skip_context_files=False,
|
||||
skip_memory=True,
|
||||
save_trajectories=False,
|
||||
)
|
||||
if sub.skills:
|
||||
# Skill loading is best-effort; if a skill is missing we note it
|
||||
# and proceed. The watcher is generally running unattended.
|
||||
try:
|
||||
from agent.skill_tools import skill_view as _skv # noqa: F401
|
||||
except Exception:
|
||||
pass
|
||||
return agent.chat(prompt) or ""
|
||||
except Exception as e:
|
||||
logger.exception("Watcher %s: agent crashed: %s", sub.name, e)
|
||||
return f"[watcher error: agent crashed: {e}]"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public entrypoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def run_watcher(
|
||||
sub: WatcherSubscription,
|
||||
*,
|
||||
now: Optional[float] = None,
|
||||
adapters: Optional[Dict[str, Any]] = None,
|
||||
loop: Any = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Execute one poll cycle for a single watcher.
|
||||
|
||||
Returns a dict summarizing the outcome::
|
||||
|
||||
{
|
||||
"name": "...", "status": "ok"|"error"|"skipped",
|
||||
"new_events": N, "error": None|str,
|
||||
}
|
||||
|
||||
The caller is responsible for persisting updates — we return rather than
|
||||
mutate the global store so callers with custom storage can compose the
|
||||
engine piece-by-piece.
|
||||
"""
|
||||
now = now or time.time()
|
||||
result = {"name": sub.name, "status": "ok", "new_events": 0, "error": None}
|
||||
|
||||
watermark = load_watermark(sub.name)
|
||||
try:
|
||||
provider = resolve_provider(sub.provider)
|
||||
except KeyError as e:
|
||||
result.update(status="error", error=str(e))
|
||||
return result
|
||||
|
||||
try:
|
||||
new_items, new_watermark = provider(sub.config, watermark)
|
||||
except ProviderError as e:
|
||||
logger.warning("Watcher %s: provider error: %s", sub.name, e)
|
||||
result.update(status="error", error=str(e))
|
||||
return result
|
||||
except Exception as e: # defensive — never let a provider crash the tick
|
||||
logger.exception("Watcher %s: unexpected provider failure: %s", sub.name, e)
|
||||
result.update(status="error", error=f"unexpected: {e}")
|
||||
return result
|
||||
|
||||
# Persist the watermark even on empty polls so ``last_polled_at`` etc.
|
||||
# advance (useful for observability).
|
||||
save_watermark(sub.name, new_watermark)
|
||||
|
||||
if not new_items:
|
||||
result["new_events"] = 0
|
||||
return result
|
||||
|
||||
result["new_events"] = len(new_items)
|
||||
prompt = _render_prompt(sub.prompt, new_items, sub)
|
||||
|
||||
if sub.deliver_only:
|
||||
err = _deliver_payload(sub, prompt, adapters=adapters, loop=loop)
|
||||
if err:
|
||||
result.update(status="error", error=err)
|
||||
return result
|
||||
|
||||
# Agent mode: run one-shot agent, then deliver its final output.
|
||||
response = _run_agent_for_watcher(sub, prompt, adapters=adapters, loop=loop)
|
||||
if response:
|
||||
err = _deliver_payload(sub, response, adapters=adapters, loop=loop)
|
||||
if err:
|
||||
result.update(status="error", error=err)
|
||||
return result
|
||||
|
||||
|
||||
def tick(
|
||||
*,
|
||||
now: Optional[float] = None,
|
||||
adapters: Optional[Dict[str, Any]] = None,
|
||||
loop: Any = None,
|
||||
verbose: bool = False,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Poll every due watcher and return per-watcher outcomes.
|
||||
|
||||
Uses a file lock so concurrent ticks (gateway + daemon) don't double-fire.
|
||||
Returns an empty list silently if another tick holds the lock.
|
||||
"""
|
||||
lock_file = _lock_path()
|
||||
lock_fd = None
|
||||
try:
|
||||
lock_fd = open(lock_file, "w")
|
||||
if fcntl:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
elif msvcrt:
|
||||
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
|
||||
except (OSError, IOError):
|
||||
logger.debug("Watcher tick skipped — another instance holds the lock")
|
||||
if lock_fd is not None:
|
||||
lock_fd.close()
|
||||
return []
|
||||
|
||||
try:
|
||||
outcomes: List[Dict[str, Any]] = []
|
||||
for sub in list_watchers():
|
||||
if not _is_due(sub, now=now):
|
||||
continue
|
||||
if verbose:
|
||||
logger.info("Watcher %s: polling (%s)", sub.name, sub.provider)
|
||||
outcome = run_watcher(sub, now=now, adapters=adapters, loop=loop)
|
||||
# Persist timing + last error back to the subscription.
|
||||
sub.last_run_at = now or time.time()
|
||||
sub.last_error = outcome.get("error")
|
||||
sub.last_event_count = outcome.get("new_events", 0)
|
||||
save_watcher(sub)
|
||||
outcomes.append(outcome)
|
||||
return outcomes
|
||||
finally:
|
||||
if lock_fd is not None:
|
||||
try:
|
||||
if fcntl:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_UN)
|
||||
except Exception:
|
||||
pass
|
||||
lock_fd.close()
|
||||
385
watchers/providers.py
Normal file
385
watchers/providers.py
Normal file
@@ -0,0 +1,385 @@
|
||||
"""Built-in watcher providers.
|
||||
|
||||
Each provider is a callable ``fetch_new(config, watermark) -> (items, new_watermark)``:
|
||||
|
||||
- ``config``: the provider-specific config dict from the subscription.
|
||||
- ``watermark``: the opaque dict the provider returned last time (empty on first run).
|
||||
- Returns a tuple of ``(new_items, new_watermark)``.
|
||||
|
||||
``new_items`` is a list of dicts (shape is provider-defined but should at
|
||||
minimum include a human-readable ``title`` and ``url`` field where
|
||||
applicable). ``new_watermark`` is persisted verbatim and handed back to the
|
||||
provider on the next tick.
|
||||
|
||||
Providers MUST be idempotent — if nothing new is available, return
|
||||
``([], watermark)``. Raising ``ProviderError`` marks the watcher as errored
|
||||
for the tick but preserves the watermark (no data loss on transient
|
||||
failures).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProviderError(Exception):
|
||||
"""Raised when a provider fetch fails transiently.
|
||||
|
||||
Persistent config errors (missing required fields, bad URLs) should
|
||||
surface as ProviderError too; the watcher engine records ``last_error``
|
||||
on the subscription so ``hermes watch list`` shows the failure.
|
||||
"""
|
||||
|
||||
|
||||
# (config, watermark) -> (items, new_watermark)
|
||||
ProviderFn = Callable[[Dict[str, Any], Dict[str, Any]], Tuple[List[Dict[str, Any]], Dict[str, Any]]]
|
||||
|
||||
|
||||
PROVIDERS: Dict[str, ProviderFn] = {}
|
||||
|
||||
|
||||
def register(name: str, fn: ProviderFn) -> None:
|
||||
"""Register a custom provider under ``name``.
|
||||
|
||||
Plugins can call this at import time to add watcher providers. Names are
|
||||
case-insensitive and must be unique.
|
||||
"""
|
||||
key = name.lower()
|
||||
PROVIDERS[key] = fn
|
||||
|
||||
|
||||
def resolve_provider(name: str) -> ProviderFn:
|
||||
"""Look up a provider by name. Raises KeyError if unknown."""
|
||||
key = (name or "").lower()
|
||||
if key not in PROVIDERS:
|
||||
raise KeyError(f"Unknown watcher provider: {name!r}. Known: {sorted(PROVIDERS)}")
|
||||
return PROVIDERS[key]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shared HTTP helper — honors optional header/timeout config across providers.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _http_get(url: str, *, headers: Optional[Dict[str, str]] = None, timeout: float = 20.0) -> bytes:
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header("User-Agent", "Hermes-Watcher/1.0")
|
||||
if headers:
|
||||
for k, v in headers.items():
|
||||
req.add_header(k, v)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
return resp.read()
|
||||
except urllib.error.HTTPError as e:
|
||||
raise ProviderError(f"HTTP {e.code} from {url}") from e
|
||||
except urllib.error.URLError as e:
|
||||
raise ProviderError(f"HTTP error: {e.reason} ({url})") from e
|
||||
except (TimeoutError, OSError) as e:
|
||||
raise ProviderError(f"Network error: {e} ({url})") from e
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provider: http_json
|
||||
#
|
||||
# Polls a JSON endpoint and treats the response as either:
|
||||
# - a top-level list of items, or
|
||||
# - a dict with an ``items_path`` pointing at the list via dotted keys.
|
||||
#
|
||||
# Dedup is by a configurable ``id_field`` (default ``"id"``). Watermark stores
|
||||
# a set of seen IDs, capped at ``max_seen`` (default 500) to bound memory.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _dig(obj: Any, path: str) -> Any:
|
||||
"""Dotted-path lookup: _dig({"a":{"b":[1,2]}}, "a.b") -> [1,2]."""
|
||||
if not path:
|
||||
return obj
|
||||
cur = obj
|
||||
for part in path.split("."):
|
||||
if isinstance(cur, dict) and part in cur:
|
||||
cur = cur[part]
|
||||
else:
|
||||
return None
|
||||
return cur
|
||||
|
||||
|
||||
def _provider_http_json(
|
||||
config: Dict[str, Any], watermark: Dict[str, Any]
|
||||
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
|
||||
url = config.get("url")
|
||||
if not url:
|
||||
raise ProviderError("http_json: 'url' is required")
|
||||
|
||||
id_field = config.get("id_field", "id")
|
||||
items_path = config.get("items_path", "")
|
||||
headers = config.get("headers") or {}
|
||||
max_seen = int(config.get("max_seen", 500))
|
||||
timeout = float(config.get("timeout", 20.0))
|
||||
|
||||
raw = _http_get(url, headers=headers, timeout=timeout)
|
||||
try:
|
||||
data = json.loads(raw.decode("utf-8"))
|
||||
except (UnicodeDecodeError, json.JSONDecodeError) as e:
|
||||
raise ProviderError(f"http_json: response is not valid JSON: {e}") from e
|
||||
|
||||
items = _dig(data, items_path) if items_path else data
|
||||
if not isinstance(items, list):
|
||||
raise ProviderError(
|
||||
f"http_json: items_path={items_path!r} did not resolve to a list"
|
||||
f" (got {type(items).__name__})"
|
||||
)
|
||||
|
||||
seen_ids = set(watermark.get("seen_ids") or [])
|
||||
is_first_run = not watermark # empty watermark = never polled
|
||||
|
||||
new_items: List[Dict[str, Any]] = []
|
||||
new_seen_ids: List[str] = []
|
||||
for item in items:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
ident = item.get(id_field)
|
||||
if ident is None:
|
||||
continue
|
||||
ident_str = str(ident)
|
||||
new_seen_ids.append(ident_str)
|
||||
if ident_str in seen_ids:
|
||||
continue
|
||||
# On first run, record all IDs but don't emit any as "new" events.
|
||||
# Otherwise the first tick of a watcher would replay the entire feed.
|
||||
if is_first_run:
|
||||
continue
|
||||
new_items.append(item)
|
||||
|
||||
# Cap the stored ID set. Keep the most recent entries — the response
|
||||
# order is provider-defined, so we just preserve insertion order and
|
||||
# trim the head.
|
||||
combined = list(seen_ids) + [i for i in new_seen_ids if i not in seen_ids]
|
||||
if len(combined) > max_seen:
|
||||
combined = combined[-max_seen:]
|
||||
|
||||
return new_items, {"seen_ids": combined, "last_polled_at": time.time()}
|
||||
|
||||
|
||||
register("http_json", _provider_http_json)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provider: rss (Atom + RSS 2.0)
|
||||
#
|
||||
# Watermark = {"seen_guids": [...]}. On first run, record all existing GUIDs
|
||||
# as seen so the watcher only emits posts published AFTER it was created.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _parse_rss_entries(xml_bytes: bytes) -> List[Dict[str, Any]]:
|
||||
"""Return a list of {id, title, url, published, summary} dicts.
|
||||
|
||||
Handles both RSS 2.0 ``<item>`` and Atom ``<entry>``. Namespaces are
|
||||
stripped for cleaner lookups.
|
||||
"""
|
||||
|
||||
def _strip_ns(tag: str) -> str:
|
||||
return tag.split("}", 1)[1] if "}" in tag else tag
|
||||
|
||||
try:
|
||||
root = ET.fromstring(xml_bytes)
|
||||
except ET.ParseError as e:
|
||||
raise ProviderError(f"rss: invalid XML: {e}") from e
|
||||
|
||||
entries: List[Dict[str, Any]] = []
|
||||
|
||||
# RSS 2.0: <rss><channel><item>
|
||||
for item in root.iter():
|
||||
tag = _strip_ns(item.tag)
|
||||
if tag not in ("item", "entry"):
|
||||
continue
|
||||
children = {_strip_ns(c.tag): c for c in item}
|
||||
# ElementTree Elements with no children are *falsy* — use explicit
|
||||
# `is not None` checks when picking between possible tags.
|
||||
guid_el = children.get("guid")
|
||||
if guid_el is None:
|
||||
guid_el = children.get("id")
|
||||
link_el = children.get("link")
|
||||
if link_el is not None:
|
||||
# Atom: link is empty element with href attr; RSS: link has text.
|
||||
href = link_el.attrib.get("href") or (link_el.text or "").strip()
|
||||
else:
|
||||
href = ""
|
||||
guid = (guid_el.text.strip() if guid_el is not None and guid_el.text else "") or href
|
||||
if not guid:
|
||||
continue
|
||||
title_el = children.get("title")
|
||||
title = (title_el.text or "").strip() if title_el is not None else ""
|
||||
pub_el = children.get("pubDate")
|
||||
if pub_el is None:
|
||||
pub_el = children.get("published")
|
||||
if pub_el is None:
|
||||
pub_el = children.get("updated")
|
||||
published = (pub_el.text or "").strip() if pub_el is not None else ""
|
||||
summ_el = children.get("description")
|
||||
if summ_el is None:
|
||||
summ_el = children.get("summary")
|
||||
summary = (summ_el.text or "").strip() if summ_el is not None else ""
|
||||
entries.append(
|
||||
{
|
||||
"id": guid,
|
||||
"title": title,
|
||||
"url": href,
|
||||
"published": published,
|
||||
"summary": summary,
|
||||
}
|
||||
)
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
def _provider_rss(
|
||||
config: Dict[str, Any], watermark: Dict[str, Any]
|
||||
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
|
||||
url = config.get("url")
|
||||
if not url:
|
||||
raise ProviderError("rss: 'url' is required")
|
||||
headers = config.get("headers") or {}
|
||||
max_seen = int(config.get("max_seen", 500))
|
||||
timeout = float(config.get("timeout", 20.0))
|
||||
|
||||
raw = _http_get(url, headers=headers, timeout=timeout)
|
||||
entries = _parse_rss_entries(raw)
|
||||
|
||||
seen = set(watermark.get("seen_guids") or [])
|
||||
is_first_run = not watermark
|
||||
|
||||
new_items: List[Dict[str, Any]] = []
|
||||
new_guids: List[str] = []
|
||||
for entry in entries:
|
||||
guid = entry["id"]
|
||||
new_guids.append(guid)
|
||||
if guid in seen:
|
||||
continue
|
||||
if is_first_run:
|
||||
continue
|
||||
new_items.append(entry)
|
||||
|
||||
combined = list(seen) + [g for g in new_guids if g not in seen]
|
||||
if len(combined) > max_seen:
|
||||
combined = combined[-max_seen:]
|
||||
return new_items, {"seen_guids": combined, "last_polled_at": time.time()}
|
||||
|
||||
|
||||
register("rss", _provider_rss)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provider: github
|
||||
#
|
||||
# Polls either:
|
||||
# - ``repo`` mode: https://api.github.com/repos/<owner>/<repo>/issues or /releases
|
||||
# - ``search`` mode: https://api.github.com/search/issues?q=<query>
|
||||
#
|
||||
# Authenticates with ``GITHUB_TOKEN`` if present (avoids 60 req/hr anon limit).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
_GITHUB_SCOPES = {"issues", "pulls", "releases", "commits"}
|
||||
|
||||
|
||||
def _provider_github(
|
||||
config: Dict[str, Any], watermark: Dict[str, Any]
|
||||
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
|
||||
import os
|
||||
|
||||
repo = config.get("repo")
|
||||
scope = (config.get("scope") or "issues").lower()
|
||||
search_query = config.get("search")
|
||||
per_page = int(config.get("per_page", 30))
|
||||
|
||||
if not repo and not search_query:
|
||||
raise ProviderError("github: one of 'repo' or 'search' is required")
|
||||
if scope not in _GITHUB_SCOPES and not search_query:
|
||||
raise ProviderError(
|
||||
f"github: scope must be one of {sorted(_GITHUB_SCOPES)} (got {scope!r})"
|
||||
)
|
||||
|
||||
if not re.fullmatch(r"[A-Za-z0-9._-]+/[A-Za-z0-9._-]+", repo or "") and repo:
|
||||
raise ProviderError(f"github: repo must be 'owner/name' (got {repo!r})")
|
||||
|
||||
if search_query:
|
||||
url = f"https://api.github.com/search/issues?q={urllib.parse.quote(search_query)}&per_page={per_page}"
|
||||
items_path = "items"
|
||||
elif scope == "commits":
|
||||
url = f"https://api.github.com/repos/{repo}/commits?per_page={per_page}"
|
||||
items_path = ""
|
||||
else:
|
||||
url = f"https://api.github.com/repos/{repo}/{scope}?per_page={per_page}&state=all"
|
||||
items_path = ""
|
||||
|
||||
headers = {"Accept": "application/vnd.github+json"}
|
||||
token = config.get("token") or os.getenv("GITHUB_TOKEN") or os.getenv("GH_TOKEN")
|
||||
if token:
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
|
||||
raw = _http_get(url, headers=headers, timeout=30.0)
|
||||
try:
|
||||
data = json.loads(raw.decode("utf-8"))
|
||||
except (UnicodeDecodeError, json.JSONDecodeError) as e:
|
||||
raise ProviderError(f"github: response is not valid JSON: {e}") from e
|
||||
|
||||
items = _dig(data, items_path) if items_path else data
|
||||
if not isinstance(items, list):
|
||||
raise ProviderError(
|
||||
f"github: expected a list; got {type(items).__name__}"
|
||||
)
|
||||
|
||||
id_field = "sha" if scope == "commits" else "id"
|
||||
|
||||
seen = set(str(x) for x in (watermark.get("seen_ids") or []))
|
||||
is_first_run = not watermark
|
||||
|
||||
new_items: List[Dict[str, Any]] = []
|
||||
all_ids: List[str] = []
|
||||
for item in items:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
ident = str(item.get(id_field, "")) or ""
|
||||
if not ident:
|
||||
continue
|
||||
all_ids.append(ident)
|
||||
if ident in seen or is_first_run:
|
||||
continue
|
||||
# Flatten the interesting fields so the prompt template is readable.
|
||||
new_items.append(
|
||||
{
|
||||
"id": ident,
|
||||
"title": item.get("title") or item.get("name") or item.get("commit", {}).get("message", "").splitlines()[0:1] or "",
|
||||
"url": item.get("html_url") or item.get("url"),
|
||||
"number": item.get("number"),
|
||||
"state": item.get("state"),
|
||||
"author": (item.get("user") or {}).get("login") or (item.get("author") or {}).get("login"),
|
||||
"created_at": item.get("created_at") or (item.get("commit") or {}).get("author", {}).get("date"),
|
||||
"body": (item.get("body") or "")[:2000], # cap so the prompt stays bounded
|
||||
}
|
||||
)
|
||||
|
||||
combined = list(seen) + [i for i in all_ids if i not in seen]
|
||||
if len(combined) > int(config.get("max_seen", 500)):
|
||||
combined = combined[-int(config.get("max_seen", 500)):]
|
||||
|
||||
return new_items, {"seen_ids": combined, "last_polled_at": time.time()}
|
||||
|
||||
|
||||
register("github", _provider_github)
|
||||
|
||||
|
||||
# Re-export urllib.parse lazily so providers can use it without each
|
||||
# importing it separately. (Doing the import at module-top would force it
|
||||
# into every codepath that touches watchers.store.)
|
||||
import urllib.parse # noqa: E402
|
||||
172
watchers/store.py
Normal file
172
watchers/store.py
Normal file
@@ -0,0 +1,172 @@
|
||||
"""Watcher subscription + watermark storage.
|
||||
|
||||
Subscriptions persist to ``<hermes_home>/watchers.json``. Watermarks live
|
||||
under ``<hermes_home>/watchers/<name>.watermark.json`` — separate files so
|
||||
``hermes watch reset <name>`` can nuke a single watchermark without touching
|
||||
the subscription, and so provider state can be inspected in isolation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import time
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from utils import atomic_replace
|
||||
|
||||
|
||||
_SUBSCRIPTIONS_FILENAME = "watchers.json"
|
||||
|
||||
|
||||
@dataclass
|
||||
class WatcherSubscription:
|
||||
"""A watcher subscription as persisted to ``watchers.json``.
|
||||
|
||||
Mirrors the webhook subscription shape where possible so users who know
|
||||
one subsystem can reason about the other.
|
||||
"""
|
||||
|
||||
name: str
|
||||
provider: str
|
||||
config: Dict[str, Any] = field(default_factory=dict)
|
||||
interval_seconds: int = 300
|
||||
prompt: str = ""
|
||||
skills: List[str] = field(default_factory=list)
|
||||
deliver: str = "origin"
|
||||
deliver_only: bool = False
|
||||
enabled: bool = True
|
||||
created_at: float = field(default_factory=time.time)
|
||||
last_run_at: Optional[float] = None
|
||||
last_error: Optional[str] = None
|
||||
last_event_count: int = 0
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: Dict[str, Any]) -> "WatcherSubscription":
|
||||
"""Tolerant of missing keys — old subscription files gain defaults."""
|
||||
return cls(
|
||||
name=d["name"],
|
||||
provider=d["provider"],
|
||||
config=dict(d.get("config") or {}),
|
||||
interval_seconds=int(d.get("interval_seconds", 300)),
|
||||
prompt=str(d.get("prompt", "")),
|
||||
skills=list(d.get("skills") or []),
|
||||
deliver=str(d.get("deliver", "origin")),
|
||||
deliver_only=bool(d.get("deliver_only", False)),
|
||||
enabled=bool(d.get("enabled", True)),
|
||||
created_at=float(d.get("created_at") or time.time()),
|
||||
last_run_at=(float(d["last_run_at"]) if d.get("last_run_at") else None),
|
||||
last_error=d.get("last_error"),
|
||||
last_event_count=int(d.get("last_event_count") or 0),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Path helpers — resolved at call time so HERMES_HOME overrides work in tests.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _subscriptions_path() -> Path:
|
||||
return get_hermes_home() / _SUBSCRIPTIONS_FILENAME
|
||||
|
||||
|
||||
def _watermark_dir() -> Path:
|
||||
return get_hermes_home() / "watchers"
|
||||
|
||||
|
||||
def _watermark_path(name: str) -> Path:
|
||||
return _watermark_dir() / f"{name}.watermark.json"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Subscription CRUD
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _load_all() -> Dict[str, Dict[str, Any]]:
|
||||
path = _subscriptions_path()
|
||||
if not path.exists():
|
||||
return {}
|
||||
try:
|
||||
raw = json.loads(path.read_text(encoding="utf-8"))
|
||||
return raw if isinstance(raw, dict) else {}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def _save_all(subs: Dict[str, Dict[str, Any]]) -> None:
|
||||
path = _subscriptions_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = path.with_suffix(".tmp")
|
||||
tmp.write_text(json.dumps(subs, indent=2, ensure_ascii=False), encoding="utf-8")
|
||||
atomic_replace(tmp, path)
|
||||
|
||||
|
||||
def list_watchers() -> List[WatcherSubscription]:
|
||||
return [WatcherSubscription.from_dict(v) for v in _load_all().values()]
|
||||
|
||||
|
||||
def get_watcher(name: str) -> Optional[WatcherSubscription]:
|
||||
raw = _load_all().get(name)
|
||||
return WatcherSubscription.from_dict(raw) if raw else None
|
||||
|
||||
|
||||
def save_watcher(sub: WatcherSubscription) -> None:
|
||||
all_subs = _load_all()
|
||||
all_subs[sub.name] = sub.to_dict()
|
||||
_save_all(all_subs)
|
||||
|
||||
|
||||
def delete_watcher(name: str) -> bool:
|
||||
all_subs = _load_all()
|
||||
if name not in all_subs:
|
||||
return False
|
||||
del all_subs[name]
|
||||
_save_all(all_subs)
|
||||
# Also remove watermark file so re-adding doesn't inherit stale state.
|
||||
wm = _watermark_path(name)
|
||||
if wm.exists():
|
||||
try:
|
||||
wm.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Watermark store — opaque per-provider state.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def load_watermark(name: str) -> Dict[str, Any]:
|
||||
"""Return the persisted watermark dict for ``name`` (empty if unset)."""
|
||||
path = _watermark_path(name)
|
||||
if not path.exists():
|
||||
return {}
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
return data if isinstance(data, dict) else {}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def save_watermark(name: str, watermark: Dict[str, Any]) -> None:
|
||||
"""Persist the watermark dict for ``name``.
|
||||
|
||||
Providers should pass back whatever they were given via their ``fetch_new``
|
||||
return value. The scheduler persists it verbatim — the shape is entirely
|
||||
up to the provider.
|
||||
"""
|
||||
path = _watermark_path(name)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = path.with_suffix(".tmp")
|
||||
tmp.write_text(
|
||||
json.dumps(watermark, indent=2, ensure_ascii=False, sort_keys=True),
|
||||
encoding="utf-8",
|
||||
)
|
||||
atomic_replace(tmp, path)
|
||||
122
website/docs/user-guide/features/watchers.md
Normal file
122
website/docs/user-guide/features/watchers.md
Normal file
@@ -0,0 +1,122 @@
|
||||
---
|
||||
title: Watchers
|
||||
description: Poll external sources on an interval and trigger the agent when new items appear.
|
||||
---
|
||||
|
||||
# Watchers
|
||||
|
||||
Watchers poll an external source on an interval, detect new items via watermark-based dedup, and either deliver those items verbatim or hand them to a short-lived agent for reasoning. They are the **pull-based sibling** of webhooks:
|
||||
|
||||
| Trigger | Source | When |
|
||||
|---|---|---|
|
||||
| Webhook | external push | whenever the source calls you |
|
||||
| Cron | time-based | on a schedule, regardless of whether anything changed |
|
||||
| **Watcher** | **pull on interval** | **only when new data is detected** |
|
||||
|
||||
Inspired by Vellum Assistant's watcher system.
|
||||
|
||||
## Quick start
|
||||
|
||||
```bash
|
||||
# Watch a GitHub repo for new issues
|
||||
hermes watch add my-repo \
|
||||
--provider github --repo NousResearch/hermes-agent --scope issues \
|
||||
--interval 300 --deliver origin
|
||||
|
||||
# Watch an RSS feed, deliver raw (no agent)
|
||||
hermes watch add hn \
|
||||
--provider rss --url https://news.ycombinator.com/rss \
|
||||
--interval 900 --deliver telegram --deliver-only
|
||||
|
||||
# Watch any JSON endpoint
|
||||
hermes watch add api-changes \
|
||||
--provider http_json --url https://api.example.com/events \
|
||||
--arg id_field=event_id --arg items_path=data.events \
|
||||
--interval 60
|
||||
```
|
||||
|
||||
Watchers piggyback on the cron scheduler's tick loop — if cron is running (gateway or `hermes cron tick`), watchers run automatically.
|
||||
|
||||
## How dedup works
|
||||
|
||||
The first poll of a new watcher **records a baseline** and does NOT emit any events. This prevents the first tick from replaying the entire feed. Only items that appear after the baseline are delivered.
|
||||
|
||||
Each watcher keeps a **watermark file** at `~/.hermes/watchers/<name>.watermark.json` containing a bounded set of seen IDs. To force a replay (treat the next poll as first-run), use `hermes watch reset <name>`.
|
||||
|
||||
## Providers
|
||||
|
||||
### `http_json`
|
||||
|
||||
Polls a JSON endpoint and dedups by a configurable ID field.
|
||||
|
||||
| Config key | Default | Purpose |
|
||||
|---|---|---|
|
||||
| `url` | *(required)* | Endpoint to GET |
|
||||
| `id_field` | `id` | Field used to dedup items |
|
||||
| `items_path` | *(empty)* | Dotted path to the list (`data.events` etc.) if the response isn't a top-level list |
|
||||
| `headers` | `{}` | Dict of HTTP headers |
|
||||
| `max_seen` | `500` | Cap on the watermark ID set |
|
||||
| `timeout` | `20.0` | Request timeout (seconds) |
|
||||
|
||||
### `rss`
|
||||
|
||||
Parses RSS 2.0 or Atom feeds. Dedups by `<guid>` / `<id>`.
|
||||
|
||||
| Config key | Default | Purpose |
|
||||
|---|---|---|
|
||||
| `url` | *(required)* | Feed URL |
|
||||
| `headers` | `{}` | Optional HTTP headers |
|
||||
| `max_seen` | `500` | Cap on the watermark GUID set |
|
||||
|
||||
### `github`
|
||||
|
||||
Polls GitHub's REST API. Auth via `GITHUB_TOKEN` / `GH_TOKEN` environment variable (or `token` in config).
|
||||
|
||||
| Config key | Default | Purpose |
|
||||
|---|---|---|
|
||||
| `repo` | — | `owner/name` (one of `repo` or `search` is required) |
|
||||
| `scope` | `issues` | `issues` / `pulls` / `releases` / `commits` |
|
||||
| `search` | — | GitHub issues search query (alternative to `repo`) |
|
||||
| `per_page` | `30` | Results per page |
|
||||
|
||||
## Delivery
|
||||
|
||||
Uses the same delivery plumbing as cron — including the new `multi` / `all` routing intents. See [cron delivery options](./cron#delivery-options) for the full list of targets.
|
||||
|
||||
- `--deliver-only` sends the rendered prompt verbatim, skipping the agent (zero LLM cost). Good for digests / notifications.
|
||||
- Without `--deliver-only`, the prompt is handed to a short-lived agent (skills optional via `--skills`), and the agent's final response is delivered.
|
||||
|
||||
## Prompt template
|
||||
|
||||
`--prompt` supports three placeholders (unknown ones pass through verbatim):
|
||||
|
||||
- `{name}` — the watcher name
|
||||
- `{count}` — number of new items
|
||||
- `{items_json}` — JSON array of new items
|
||||
|
||||
If `--prompt` is omitted, a default like `"{name}: {count} new event(s) from the {provider} watcher.\n\nItems:\n{items_json}"` is used.
|
||||
|
||||
## Custom providers
|
||||
|
||||
Plugins can register additional providers at import time:
|
||||
|
||||
```python
|
||||
from watchers.providers import register
|
||||
|
||||
def my_provider(config, watermark):
|
||||
# Fetch, dedup, return (new_items, new_watermark).
|
||||
return [], watermark
|
||||
|
||||
register("my_provider", my_provider)
|
||||
```
|
||||
|
||||
## Subcommands
|
||||
|
||||
| Command | Purpose |
|
||||
|---|---|
|
||||
| `hermes watch add <name> --provider <p> ...` | Create or update a watcher |
|
||||
| `hermes watch list [--verbose]` | Show all watchers and their status |
|
||||
| `hermes watch remove <name>` | Delete a watcher + its watermark |
|
||||
| `hermes watch run <name>` | Fire one watcher out of band (respects dedup) |
|
||||
| `hermes watch reset <name>` | Clear the watermark — next run treats it as first poll |
|
||||
| `hermes watch tick` | Poll every due watcher now |
|
||||
@@ -63,6 +63,7 @@ const sidebars: SidebarsConfig = {
|
||||
label: 'Automation',
|
||||
items: [
|
||||
'user-guide/features/cron',
|
||||
'user-guide/features/watchers',
|
||||
'user-guide/features/delegation',
|
||||
'user-guide/features/kanban',
|
||||
'user-guide/features/kanban-tutorial',
|
||||
|
||||
Reference in New Issue
Block a user