Compare commits

...

2 Commits

Author SHA1 Message Date
alt-glitch
c97f0a6c82 refactor(apify): move Actor tools into bundled plugin
Re-shelve the three Apify tools from core into plugins/apify/, matching
the Spotify plugin pattern for optional third-party SaaS integrations.
tools/ is reserved for foundational capabilities; third-party service
integrations live in plugins/.

- plugins/apify/{__init__,tools,client}.py + plugin.yaml + README
  (kind: backend, auto-loads; registers via ctx.register_tool())
- remove apify_* from _HERMES_CORE_TOOLS in toolsets.py
  (TOOLSETS["apify"] entry kept, mirroring spotify)
- tests moved tests/tools/test_apify_tool.py -> tests/plugins/test_apify.py,
  import paths updated (34 tests pass)
- add JanHranicky to AUTHOR_MAP in scripts/release.py (CI gate)

The tools_config.py / config.py / lazy_deps.py / pyproject.toml setup +
config UX from the original commit is retained unchanged.

Co-authored-by: JanHranicky <jan.hranicky@seznam.cz>
2026-06-08 15:03:11 +05:30
JanHranicky
58e921a819 feat(apify): Actor execution tools — discover, start, collect
Cherry-picked from PR #41932 (JanHranicky). Original implementation
registered the three Apify tools as built-in core tools; the follow-up
commit moves them into a bundled plugin (plugins/apify/).

Co-authored-by: JanHranicky <jan.hranicky@seznam.cz>
2026-06-08 15:00:57 +05:30
12 changed files with 1245 additions and 1 deletions

View File

@@ -2932,6 +2932,14 @@ OPTIONAL_ENV_VARS = {
"password": True,
"category": "tool",
},
"APIFY_API_TOKEN": {
"description": "Apify API token for Actor execution tools (apify_discover, apify_start, apify_collect)",
"prompt": "Apify API token",
"url": "https://apify.com/account/integrations",
"tools": ["apify_discover", "apify_start", "apify_collect"],
"password": True,
"category": "tool",
},
"SEARXNG_URL": {
"description": "URL of your SearXNG instance for free self-hosted web search",
"prompt": "SearXNG URL (e.g. http://localhost:8080)",

View File

@@ -80,6 +80,7 @@ CONFIGURABLE_TOOLSETS = [
("discord_admin", "🛡️ Discord Server Admin", "list channels/roles, pin, assign roles"),
("yuanbao", "🤖 Yuanbao", "group info, member queries, DM"),
("computer_use", "🖱️ Computer Use (macOS)", "background desktop control via cua-driver"),
("apify", "🎭 Apify Actors", "discover, start, and collect Actor runs (requires APIFY_API_TOKEN)"),
]
@@ -112,7 +113,7 @@ def gui_toolset_label(label: str) -> str:
# `hermes tools` → X (Twitter) Search setup walks users through credential
# setup. The tool's check_fn means the schema still won't appear to the
# model if the credential later goes missing or expires.
_DEFAULT_OFF_TOOLSETS = {"moa", "homeassistant", "spotify", "discord", "discord_admin", "video", "video_gen", "x_search"}
_DEFAULT_OFF_TOOLSETS = {"moa", "homeassistant", "spotify", "discord", "discord_admin", "video", "video_gen", "x_search", "apify"}
def _xai_credentials_present() -> bool:
@@ -562,6 +563,27 @@ TOOL_CATEGORIES = {
},
],
},
"apify": {
"name": "Apify Actors",
"icon": "🕷️",
"providers": [
{
"name": "Apify",
"badge": "paid",
"tag": (
"Run any Actor from the Apify Store — social media, "
"Google Maps, e-commerce, and more."
),
"env_vars": [
{
"key": "APIFY_API_TOKEN",
"prompt": "Apify API token",
"url": "https://apify.com/account/integrations",
},
],
},
],
},
}
# Simple env-var requirements for toolsets NOT in TOOL_CATEGORIES.

41
plugins/apify/README.md Normal file
View File

@@ -0,0 +1,41 @@
# Apify Actor Tools
Bundled plugin that brings [Apify](https://apify.com/) Actors into Hermes. Apify
hosts 20,000+ ready-made Actors for web automation and data extraction
(Instagram, YouTube, Google Maps, LinkedIn, Amazon, and more). The agent can
discover the right Actor, inspect its input schema, run it, and collect
structured results.
## Tools
| Tool | What it does |
|------|--------------|
| `apify_discover` | Search the Apify Store by keyword, or fetch a specific Actor's input schema + README by `actor_id`. |
| `apify_start` | Fire-and-forget batch Actor starts (up to 10 per call). Returns run refs immediately so the agent keeps reasoning while Actors run. |
| `apify_collect` | Poll run statuses and return completed dataset results, wrapped in `EXTERNAL_UNTRUSTED_CONTENT` markers. Supports `limit` / `may_have_more` pagination. |
## Setup
1. Create an Apify account and get a token at
<https://apify.com/account/integrations>.
2. Run `hermes tools`, open **Apify Actors**, enable the toolset, and paste your
token. (The token is stored in `~/.hermes/.env` as `APIFY_API_TOKEN` and is
never sent to the model.)
The `apify` toolset is **off by default**. The three tools register on startup
but stay invisible to the model until `APIFY_API_TOKEN` is set (runtime
`check_fn` gate).
## Architecture
This is a bundled `kind: backend` plugin (auto-loads, no opt-in), modeled on the
Spotify plugin. Tool registration goes through the plugin API
(`ctx.register_tool()`), so the Apify tools never enter `_HERMES_CORE_TOOLS`. The
`apify-client` SDK is installed on demand via `tools.lazy_deps` (`search.apify`)
the first time an Actor runs.
| File | Purpose |
|------|---------|
| `__init__.py` | `register(ctx)` — wires the three tools via `ctx.register_tool()`. |
| `tools.py` | Handlers + JSON schemas. |
| `client.py` | Lazy `apify-client` import, token validation, module-level cache. |

72
plugins/apify/__init__.py Normal file
View File

@@ -0,0 +1,72 @@
"""Apify Actor execution plugin — bundled, auto-loaded.
Registers three tools (``apify_discover``, ``apify_start``, ``apify_collect``)
into the ``apify`` toolset. Each tool is gated by ``_check_token()`` — when the
user has not set ``APIFY_API_TOKEN`` the tools stay registered (so they appear
in ``hermes tools``) but the runtime check prevents dispatch.
Why a plugin instead of top-level ``tools/`` files?
- ``plugins/`` is where third-party service integrations live (see
``plugins/spotify/`` for the same pattern — optional SaaS, token-gated,
default-off toolset). ``tools/`` is reserved for foundational capabilities
(terminal, read_file, web_search, etc.).
- Bundled + ``kind: backend`` auto-loads on startup just like the Spotify
plugin — no user opt-in needed, no ``plugins.enabled`` config.
- Keeps the three Apify tools out of ``_HERMES_CORE_TOOLS`` in ``toolsets.py``;
the plugin loader registers them via ``ctx.register_tool()``.
The ``apify`` toolset is default-off (``_DEFAULT_OFF_TOOLSETS`` in
``hermes_cli/tools_config.py``) and the ``APIFY_API_TOKEN`` setup UX is wired
through ``hermes tools`` (``TOOL_CATEGORIES``) and ``OPTIONAL_ENV_VARS``.
"""
from __future__ import annotations
import json
from typing import Any, Dict
from plugins.apify.tools import (
_COLLECT_SCHEMA,
_DISCOVER_SCHEMA,
_START_SCHEMA,
_check_token,
_collect_handler,
_discover_handler,
_start_handler,
)
async def _collect_handler_str(args: Dict[str, Any], **_kw: Any) -> str:
return json.dumps(await _collect_handler(args), default=str)
def register(ctx) -> None:
"""Register the Apify Actor tools. Called once by the plugin loader."""
ctx.register_tool(
name="apify_discover",
toolset="apify",
schema=_DISCOVER_SCHEMA,
handler=lambda args, **kw: json.dumps(_discover_handler(args), default=str),
check_fn=_check_token,
requires_env=["APIFY_API_TOKEN"],
emoji="🔍",
)
ctx.register_tool(
name="apify_start",
toolset="apify",
schema=_START_SCHEMA,
handler=lambda args, **kw: json.dumps(_start_handler(args), default=str),
check_fn=_check_token,
requires_env=["APIFY_API_TOKEN"],
emoji="▶️",
)
ctx.register_tool(
name="apify_collect",
toolset="apify",
schema=_COLLECT_SCHEMA,
handler=_collect_handler_str,
check_fn=_check_token,
requires_env=["APIFY_API_TOKEN"],
is_async=True,
emoji="📦",
)

65
plugins/apify/client.py Normal file
View File

@@ -0,0 +1,65 @@
"""Shared Apify SDK client — lazy import, token validation, and cache.
Used by the Apify Actor execution tools (plugins/apify/tools.py). The
``apify-client`` SDK is installed on demand via ``tools.lazy_deps`` so the
dependency is only pulled when the user actually enables the plugin and runs
an Actor.
"""
from __future__ import annotations
import os
from typing import Any, Optional
# Sent with every request so Apify can attribute traffic to this integration.
_HERMES_HEADERS = {"x-apify-integration-platform": "hermes-agent"}
_CLIENT_CLS: Optional[type] = None
_CLIENT: Optional[Any] = None
_CLIENT_CONFIG: Optional[Any] = None
def _load_client_cls() -> type:
global _CLIENT_CLS
if _CLIENT_CLS is None:
try:
from tools.lazy_deps import ensure as _lazy_ensure
_lazy_ensure("search.apify", prompt=False)
except ImportError:
pass
except Exception as exc: # noqa: BLE001
raise ImportError(str(exc))
from apify_client import ApifyClient
_CLIENT_CLS = ApifyClient
return _CLIENT_CLS
def check_apify_api_key() -> bool:
"""Return True when APIFY_API_TOKEN is configured."""
return bool(os.getenv("APIFY_API_TOKEN", "").strip())
def get_apify_client() -> Any:
"""Return a cached ApifyClient built from APIFY_API_TOKEN.
Raises ValueError when the token is not set.
"""
global _CLIENT, _CLIENT_CONFIG
api_token = os.getenv("APIFY_API_TOKEN", "").strip()
if not api_token:
raise ValueError(
"Apify tools are not configured. "
"Set APIFY_API_TOKEN (get one at https://apify.com/account/integrations)."
)
client_config = ("direct", api_token)
if _CLIENT is not None and _CLIENT_CONFIG == client_config:
return _CLIENT
_CLIENT = _load_client_cls()(token=api_token, headers=_HERMES_HEADERS)
_CLIENT_CONFIG = client_config
return _CLIENT
def _reset_client_for_tests() -> None:
"""Drop cached client so tests can re-instantiate cleanly."""
global _CLIENT, _CLIENT_CONFIG
_CLIENT = None
_CLIENT_CONFIG = None

View File

@@ -0,0 +1,9 @@
name: apify
version: 1.0.0
description: "Apify Actor execution — 3 tools (discover, start, collect) for running any of 20,000+ Actors from the Apify Store (web automation, data extraction, social media, maps, e-commerce). Gated on APIFY_API_TOKEN. Toolset is default-off; enable via `hermes tools` → Apify Actors."
author: JanHranicky
kind: backend
provides_tools:
- apify_discover
- apify_start
- apify_collect

390
plugins/apify/tools.py Normal file
View File

@@ -0,0 +1,390 @@
"""Apify Actor execution tools — discover, start, collect.
Handlers and schemas for the three Apify tools. Registration happens in
``plugins/apify/__init__.py`` via ``ctx.register_tool()`` (the plugin API),
not via direct ``registry.register()`` calls.
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
_TERMINAL_STATUSES = {"SUCCEEDED", "FAILED", "ABORTED", "TIMED-OUT"}
_MAX_BATCH_RUNS = 10
_COLLECT_DEFAULT_LIMIT = 100
def _attr(obj: Any, key: str, default: Any = None) -> Any:
"""Get attribute or dict key from SDK response objects (apify_client returns either)."""
if isinstance(obj, dict):
return obj.get(key, default)
if obj is not None and hasattr(obj, key):
return getattr(obj, key)
return default
def _get_client() -> Any:
from plugins.apify.client import get_apify_client
return get_apify_client()
def _check_token() -> bool:
from plugins.apify.client import check_apify_api_key
return check_apify_api_key()
# ---------------------------------------------------------------------------
# Handlers
# ---------------------------------------------------------------------------
def _discover_handler(args: Dict[str, Any]) -> Dict[str, Any]:
from tools.interrupt import is_interrupted
if is_interrupted():
return {"error": "Interrupted"}
query = (_attr(args, "query") or "").strip() or None
actor_id = (_attr(args, "actor_id") or "").strip() or None
if not query and not actor_id:
return {
"error": (
"Provide exactly one of 'query' (to search the Apify Store) "
"or 'actor_id' (to fetch an Actor's input schema)."
)
}
client = _get_client()
if actor_id:
try:
actor_info = client.actor(actor_id).get()
if actor_info is None:
return {
"error": (
f"Actor '{actor_id}' not found. "
"Check the ID format: username~actor-name."
)
}
input_schema: Any = None
readme: Any = None
build_detail = client.actor(actor_id).default_build().get()
if build_detail is not None:
actor_def = _attr(build_detail, "actorDefinition") or {}
raw_schema = _attr(actor_def, "input")
if raw_schema:
input_schema = json.dumps(raw_schema)
else:
fallback = _attr(build_detail, "inputSchema")
if fallback:
input_schema = str(fallback)
raw_readme = _attr(actor_def, "readme") or _attr(build_detail, "readme")
if raw_readme:
readme = str(raw_readme)[:3000]
username = _attr(actor_info, "username", "")
name = _attr(actor_info, "name", "")
title = _attr(actor_info, "title", "") or name
return {
"actor_id": f"{username}~{name}",
"name": name,
"title": title,
"username": username,
"description": _attr(actor_info, "description", ""),
"input_schema": input_schema,
"readme": readme,
"tip": (
f"Use apify_start with actor_id='{username}~{name}' "
"and an input matching the input_schema above."
),
}
except Exception as exc: # noqa: BLE001
logger.warning("apify_discover schema fetch error for %s: %s", actor_id, exc)
return {"error": str(exc)}
# Store search
try:
result = client.store().list(search=query, limit=10, sort_by="relevance")
items = _attr(result, "items") or []
actors: List[Dict[str, Any]] = []
for item in items:
stats = _attr(item, "stats") or {}
name = _attr(item, "name", "")
username = _attr(item, "username", "")
title = _attr(item, "title") or name
desc = (_attr(item, "description") or "")[:200]
run_count = _attr(stats, "totalRuns", 0) or 0
rating = _attr(stats, "averageRating")
actors.append({
"actor_id": f"{username}~{name}",
"name": name,
"title": title,
"username": username,
"description": desc,
"run_count": run_count,
"rating": rating,
})
return {"actors": actors}
except Exception as exc: # noqa: BLE001
logger.warning("apify_discover store search error for '%s': %s", query, exc)
return {"error": str(exc)}
def _start_handler(args: Dict[str, Any]) -> Dict[str, Any]:
from tools.interrupt import is_interrupted
if is_interrupted():
return {"error": "Interrupted"}
run_specs = args.get("runs") or []
if not run_specs:
return {"error": "Provide at least one run spec in 'runs'."}
if len(run_specs) > _MAX_BATCH_RUNS:
return {"error": f"Batch too large: {len(run_specs)} runs requested, maximum is {_MAX_BATCH_RUNS}."}
client = _get_client()
started: List[Dict[str, Any]] = []
errors: List[Dict[str, Any]] = []
for spec in run_specs:
from tools.interrupt import is_interrupted
if is_interrupted():
break
actor_id = (spec.get("actor_id") or "").strip()
run_input = spec.get("input") or {}
label = spec.get("label")
if not actor_id:
errors.append({"error": "Missing 'actor_id' in run spec."})
continue
try:
run = client.actor(actor_id).start(run_input=run_input)
entry: Dict[str, Any] = {
"run_id": _attr(run, "id"),
"actor_id": actor_id,
"dataset_id": _attr(run, "default_dataset_id"),
"status": _attr(run, "status"),
}
if label:
entry["label"] = label
started.append(entry)
except Exception as exc: # noqa: BLE001
logger.warning("apify_start error for %s: %s", actor_id, exc)
err: Dict[str, Any] = {"actor_id": actor_id, "error": str(exc)}
if label:
err["label"] = label
errors.append(err)
result: Dict[str, Any] = {"runs": started}
if errors:
result["errors"] = errors
return result
async def _collect_handler(args: Dict[str, Any]) -> Dict[str, Any]:
from tools.interrupt import is_interrupted
if is_interrupted():
return {"error": "Interrupted"}
run_refs = args.get("runs") or []
if not run_refs:
return {"error": "Provide 'runs' array (from apify_start)."}
limit = int(args.get("limit") or _COLLECT_DEFAULT_LIMIT)
client = _get_client()
async def _check_run(ref: Dict[str, Any]) -> Dict[str, Any]:
run_id = ref.get("run_id", "")
actor_id = ref.get("actor_id", "")
dataset_id = ref.get("dataset_id", "")
label = ref.get("label")
base: Dict[str, Any] = {
"run_id": run_id,
"actor_id": actor_id,
"dataset_id": dataset_id,
}
if label:
base["label"] = label
try:
run_info = await asyncio.to_thread(client.run(run_id).get)
if run_info is None:
return {**base, "_type": "error", "error": "Run not found."}
status = _attr(run_info, "status", "UNKNOWN")
base["status"] = status
if status not in _TERMINAL_STATUSES:
return {**base, "_type": "pending"}
if status != "SUCCEEDED":
return {**base, "_type": "error", "error": f"Run ended with status: {status}"}
# SUCCEEDED — fetch dataset and wrap as external content
dataset_result = await asyncio.to_thread(
client.dataset(dataset_id).list_items, limit=limit
)
items = list(_attr(dataset_result, "items") or [])
may_have_more = len(items) == limit
if may_have_more:
logger.warning(
"apify_collect run %s: fetched %d items (hit limit=%d) — "
"dataset may have more; re-call with a higher limit if needed",
run_id, len(items), limit,
)
raw = json.dumps(items, indent=2, default=str)
if len(raw) > 50_000:
raw = raw[:50_000] + "\n\n[…truncated]"
wrapped = (
"<<<EXTERNAL_UNTRUSTED_CONTENT>>>\n"
+ raw
+ "\n<<<END_EXTERNAL_UNTRUSTED_CONTENT>>>"
)
result: Dict[str, Any] = {
**base,
"_type": "completed",
"result_count": len(items),
"data": wrapped,
}
if may_have_more:
result["may_have_more"] = True
return result
except Exception as exc: # noqa: BLE001
logger.warning("apify_collect error for run %s: %s", run_id, exc)
return {**base, "_type": "error", "error": str(exc)}
raw_results = await asyncio.gather(*[_check_run(ref) for ref in run_refs])
completed: List[Dict[str, Any]] = []
pending: List[Dict[str, Any]] = []
errors: List[Dict[str, Any]] = []
for r in raw_results:
t = r.pop("_type", "error")
if t == "pending":
pending.append(r)
elif t == "error":
errors.append(r)
else:
completed.append(r)
return {
"all_done": len(pending) == 0,
"completed": completed,
"pending": pending,
"errors": errors,
}
# ---------------------------------------------------------------------------
# Schemas
# ---------------------------------------------------------------------------
_DISCOVER_SCHEMA: Dict[str, Any] = {
"name": "apify_discover",
"description": (
"Search the Apify Store for Actors by keyword, or fetch an Actor's "
"input schema and README. Provide 'query' to search, or 'actor_id' "
"to inspect a specific Actor. Actor IDs use tilde: username~actor-name."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Keywords to search the Apify Store (e.g. 'instagram scraper').",
},
"actor_id": {
"type": "string",
"description": (
"Actor ID to fetch its input schema and README "
"(e.g. 'apify~google-search-scraper')."
),
},
},
},
}
_START_SCHEMA: Dict[str, Any] = {
"name": "apify_start",
"description": (
"Start one or more Apify Actor runs. Returns run references immediately "
f"(fire-and-forget). Pass the returned run refs to apify_collect to get results. "
f"Maximum {_MAX_BATCH_RUNS} runs per call."
),
"parameters": {
"type": "object",
"properties": {
"runs": {
"type": "array",
"description": f"List of Actor runs to start (max {_MAX_BATCH_RUNS}).",
"maxItems": _MAX_BATCH_RUNS,
"items": {
"type": "object",
"properties": {
"actor_id": {
"type": "string",
"description": "Actor ID (username~actor-name).",
},
"input": {
"type": "object",
"description": "Actor input parameters.",
},
"label": {
"type": "string",
"description": "Optional label to identify this run in results.",
},
},
"required": ["actor_id"],
},
},
},
"required": ["runs"],
},
}
_COLLECT_SCHEMA: Dict[str, Any] = {
"name": "apify_collect",
"description": (
"Poll the status of Apify Actor runs started with apify_start. "
"Returns completed results, still-running refs, and errors. "
"Re-call with the same run refs until all_done is true."
),
"parameters": {
"type": "object",
"properties": {
"runs": {
"type": "array",
"description": "Run references returned by apify_start.",
"items": {
"type": "object",
"properties": {
"run_id": {"type": "string"},
"actor_id": {"type": "string"},
"dataset_id": {"type": "string"},
"label": {"type": "string"},
},
"required": ["run_id", "actor_id", "dataset_id"],
},
},
"limit": {
"type": "integer",
"description": (
f"Max dataset items to fetch per run (default {_COLLECT_DEFAULT_LIMIT}). "
"Increase if may_have_more is true in a previous response."
),
},
},
"required": ["runs"],
},
}

View File

@@ -109,6 +109,7 @@ anthropic = ["anthropic==0.86.0"]
# search provider (configured via `hermes tools` or config.yaml).
exa = ["exa-py==2.10.2"]
firecrawl = ["firecrawl-py==4.17.0"]
apify = ["apify-client==3.0.1"]
parallel-web = ["parallel-web==0.4.2"]
# Image generation backends
fal = ["fal-client==0.13.1"]

View File

@@ -132,6 +132,7 @@ AUTHOR_MAP = {
"tillfalko@gmail.com": "tillfalko",
"hi@fesalfayed.com": "fesalfayed",
"marek.les@seznam.cz": "maxcz79",
"jan.hranicky@seznam.cz": "JanHranicky",
# teknium (multiple emails)
"teknium1@gmail.com": "teknium1",
"kenyon1977@gmail.com": "kenyonxu",

625
tests/plugins/test_apify.py Normal file
View File

@@ -0,0 +1,625 @@
"""Tests for plugins/apify/tools.py — all mocked, no live Actor calls."""
from __future__ import annotations
import json
from unittest.mock import MagicMock
import pytest
# ---------------------------------------------------------------------------
# Shared fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_client(monkeypatch):
"""Patch _get_client to return a MagicMock client.
NOTE: _attr() checks hasattr() first, which always returns True on MagicMock.
The 'default' argument in _attr() is never used on MagicMock objects.
Always explicitly set every attribute you want to read in your tests.
"""
client = MagicMock()
monkeypatch.setattr(
"plugins.apify.tools._get_client",
lambda: client,
)
return client
@pytest.fixture(autouse=True)
def not_interrupted(monkeypatch):
"""Default: is_interrupted() returns False."""
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
# ---------------------------------------------------------------------------
# apify_discover — store search
# ---------------------------------------------------------------------------
class TestDiscoverStoreSearch:
def test_returns_actors_list(self, mock_client):
actor_mock = MagicMock()
actor_mock.username = "apify"
actor_mock.name = "instagram-scraper"
actor_mock.title = "Instagram Scraper"
actor_mock.description = "Scrapes Instagram profiles."
stats_mock = MagicMock()
stats_mock.totalRuns = 50000
stats_mock.averageRating = 4.7
actor_mock.stats = stats_mock
list_result = MagicMock()
list_result.items = [actor_mock]
mock_client.store.return_value.list.return_value = list_result
from plugins.apify.tools import _discover_handler
result = _discover_handler({"query": "instagram scraper"})
assert "actors" in result
assert len(result["actors"]) == 1
a = result["actors"][0]
assert a["actor_id"] == "apify~instagram-scraper"
assert a["name"] == "instagram-scraper"
assert a["title"] == "Instagram Scraper"
assert a["username"] == "apify"
assert a["run_count"] == 50000
assert a["rating"] == 4.7
mock_client.store.return_value.list.assert_called_once_with(
search="instagram scraper", limit=10, sort_by="relevance"
)
def test_description_truncated_to_200_chars(self, mock_client):
actor_mock = MagicMock()
actor_mock.username = "apify"
actor_mock.name = "test-actor"
actor_mock.title = "Test"
actor_mock.description = "x" * 300
actor_mock.stats = MagicMock(totalRuns=0, averageRating=None)
list_result = MagicMock()
list_result.items = [actor_mock]
mock_client.store.return_value.list.return_value = list_result
from plugins.apify.tools import _discover_handler
result = _discover_handler({"query": "test"})
assert len(result["actors"][0]["description"]) == 200
def test_store_search_api_error_returns_error_dict(self, mock_client):
mock_client.store.return_value.list.side_effect = RuntimeError("API error")
from plugins.apify.tools import _discover_handler
result = _discover_handler({"query": "test"})
assert "error" in result
assert "API error" in result["error"]
# ---------------------------------------------------------------------------
# apify_discover — actor schema fetch
# ---------------------------------------------------------------------------
class TestDiscoverActorSchema:
def _setup_build_mock(self, mock_client, *, input_schema=None, readme=None):
"""Helper: wire up actor + build mock chain for default_build() path."""
actor_info = MagicMock()
actor_info.username = "apify"
actor_info.name = "google-search-scraper"
actor_info.title = "Google Search Scraper"
actor_info.description = "Scrapes Google Search."
mock_client.actor.return_value.get.return_value = actor_info
build_detail = MagicMock()
actor_def = MagicMock()
actor_def.input = input_schema # dict or None
actor_def.readme = readme
build_detail.actorDefinition = actor_def
build_detail.inputSchema = None
build_detail.readme = None
mock_client.actor.return_value.default_build.return_value.get.return_value = build_detail
return actor_info, build_detail
def test_returns_actor_schema(self, mock_client):
schema = {"type": "object", "properties": {"query": {"type": "string"}}}
self._setup_build_mock(mock_client, input_schema=schema, readme="# README content")
from plugins.apify.tools import _discover_handler
result = _discover_handler({"actor_id": "apify~google-search-scraper"})
assert result["actor_id"] == "apify~google-search-scraper"
assert result["name"] == "google-search-scraper"
assert result["title"] == "Google Search Scraper"
assert result["username"] == "apify"
assert result["description"] == "Scrapes Google Search."
assert json.loads(result["input_schema"]) == schema
assert result["readme"] == "# README content"
assert "apify_start" in result["tip"]
assert "apify~google-search-scraper" in result["tip"]
def test_readme_truncated_to_3000_chars(self, mock_client):
self._setup_build_mock(mock_client, readme="R" * 4000)
from plugins.apify.tools import _discover_handler
result = _discover_handler({"actor_id": "apify~google-search-scraper"})
assert len(result["readme"]) == 3000
def test_falls_back_to_build_input_schema_string(self, mock_client):
"""When actorDefinition.input is None, fall back to build.inputSchema string."""
_, build_detail = self._setup_build_mock(mock_client, input_schema=None)
build_detail.inputSchema = '{"type":"object"}'
build_detail.actorDefinition.input = None
from plugins.apify.tools import _discover_handler
result = _discover_handler({"actor_id": "apify~google-search-scraper"})
assert result["input_schema"] == '{"type":"object"}'
def test_actor_not_found_returns_error(self, mock_client):
mock_client.actor.return_value.get.return_value = None
from plugins.apify.tools import _discover_handler
result = _discover_handler({"actor_id": "apify~nonexistent"})
assert "error" in result
assert "not found" in result["error"]
# ---------------------------------------------------------------------------
# apify_discover — validation
# ---------------------------------------------------------------------------
class TestDiscoverValidation:
def test_missing_both_params_returns_error(self, mock_client):
from plugins.apify.tools import _discover_handler
result = _discover_handler({})
assert "error" in result
assert "query" in result["error"]
assert "actor_id" in result["error"]
def test_empty_string_params_treated_as_missing(self, mock_client):
from plugins.apify.tools import _discover_handler
result = _discover_handler({"query": " ", "actor_id": ""})
assert "error" in result
def test_interrupted_returns_error(self, mock_client, monkeypatch):
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: True)
from plugins.apify.tools import _discover_handler
result = _discover_handler({"query": "test"})
assert result == {"error": "Interrupted"}
mock_client.store.assert_not_called()
# ---------------------------------------------------------------------------
# apify_start
# ---------------------------------------------------------------------------
class TestStart:
def _make_run_mock(self, run_id="r1", dataset_id="d1", status="QUEUED"):
run = MagicMock()
run.id = run_id
run.default_dataset_id = dataset_id
run.status = status
return run
def test_single_run_returns_run_ref(self, mock_client):
mock_client.actor.return_value.start.return_value = self._make_run_mock()
from plugins.apify.tools import _start_handler
result = _start_handler({
"runs": [{"actor_id": "apify~test", "input": {"key": "val"}}]
})
assert "runs" in result
assert len(result["runs"]) == 1
r = result["runs"][0]
assert r["run_id"] == "r1"
assert r["actor_id"] == "apify~test"
assert r["dataset_id"] == "d1"
assert r["status"] == "QUEUED"
mock_client.actor.return_value.start.assert_called_once_with(
run_input={"key": "val"}
)
def test_label_included_when_provided(self, mock_client):
mock_client.actor.return_value.start.return_value = self._make_run_mock()
from plugins.apify.tools import _start_handler
result = _start_handler({
"runs": [{"actor_id": "apify~test", "input": {}, "label": "my-run"}]
})
assert result["runs"][0]["label"] == "my-run"
def test_label_absent_when_not_provided(self, mock_client):
mock_client.actor.return_value.start.return_value = self._make_run_mock()
from plugins.apify.tools import _start_handler
result = _start_handler({
"runs": [{"actor_id": "apify~test", "input": {}}]
})
assert "label" not in result["runs"][0]
def test_batch_start_two_runs(self, mock_client):
run1 = self._make_run_mock(run_id="r1", dataset_id="d1")
run2 = self._make_run_mock(run_id="r2", dataset_id="d2")
mock_client.actor.return_value.start.side_effect = [run1, run2]
from plugins.apify.tools import _start_handler
result = _start_handler({
"runs": [
{"actor_id": "apify~actor-a", "input": {}},
{"actor_id": "apify~actor-b", "input": {}},
]
})
assert len(result["runs"]) == 2
assert result["runs"][0]["run_id"] == "r1"
assert result["runs"][1]["run_id"] == "r2"
def test_per_run_api_error_goes_to_errors(self, mock_client):
mock_client.actor.return_value.start.side_effect = RuntimeError("not found")
from plugins.apify.tools import _start_handler
result = _start_handler({
"runs": [{"actor_id": "apify~bad-actor", "input": {}}]
})
assert result["runs"] == []
assert len(result["errors"]) == 1
assert "not found" in result["errors"][0]["error"]
def test_interrupted_returns_early(self, mock_client, monkeypatch):
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: True)
from plugins.apify.tools import _start_handler
result = _start_handler({"runs": [{"actor_id": "apify~test", "input": {}}]})
assert result == {"error": "Interrupted"}
mock_client.actor.assert_not_called()
def test_interrupt_mid_batch_stops_remaining_runs(self, mock_client, monkeypatch):
call_count = 0
original_run = self._make_run_mock()
def start_side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
return original_run
mock_client.actor.return_value.start.side_effect = start_side_effect
# entry check → False, loop iter 1 → False (fires run), loop iter 2 → True (stops)
interrupted_after_first = iter([False, False, True])
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: next(interrupted_after_first))
from plugins.apify.tools import _start_handler
result = _start_handler({
"runs": [
{"actor_id": "apify~actor-a", "input": {}},
{"actor_id": "apify~actor-b", "input": {}},
{"actor_id": "apify~actor-c", "input": {}},
]
})
assert call_count == 1 # only the first run fired before interrupt
assert len(result["runs"]) == 1
def test_batch_over_limit_returns_error(self, mock_client):
from plugins.apify.tools import _start_handler, _MAX_BATCH_RUNS
oversized = [{"actor_id": f"apify~actor-{i}", "input": {}} for i in range(_MAX_BATCH_RUNS + 1)]
result = _start_handler({"runs": oversized})
assert "error" in result
assert str(_MAX_BATCH_RUNS) in result["error"]
mock_client.actor.assert_not_called()
# ---------------------------------------------------------------------------
# apify_collect — non-terminal states and errors
# ---------------------------------------------------------------------------
class TestCollectNonTerminal:
@pytest.mark.asyncio
async def test_running_run_goes_to_pending(self, mock_client):
run_info = MagicMock()
run_info.status = "RUNNING"
mock_client.run.return_value.get.return_value = run_info
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert result["all_done"] is False
assert len(result["pending"]) == 1
assert result["pending"][0]["run_id"] == "r1"
assert result["pending"][0]["status"] == "RUNNING"
assert result["completed"] == []
assert result["errors"] == []
@pytest.mark.asyncio
async def test_queued_run_goes_to_pending(self, mock_client):
run_info = MagicMock()
run_info.status = "QUEUED"
mock_client.run.return_value.get.return_value = run_info
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert result["all_done"] is False
assert result["pending"][0]["status"] == "QUEUED"
@pytest.mark.asyncio
async def test_failed_run_goes_to_errors(self, mock_client):
run_info = MagicMock()
run_info.status = "FAILED"
mock_client.run.return_value.get.return_value = run_info
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert result["all_done"] is True # no pending
assert result["errors"][0]["status"] == "FAILED"
assert "FAILED" in result["errors"][0]["error"]
@pytest.mark.asyncio
async def test_aborted_run_goes_to_errors(self, mock_client):
run_info = MagicMock()
run_info.status = "ABORTED"
mock_client.run.return_value.get.return_value = run_info
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert result["errors"][0]["status"] == "ABORTED"
@pytest.mark.asyncio
async def test_run_not_found_goes_to_errors(self, mock_client):
mock_client.run.return_value.get.return_value = None
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert "not found" in result["errors"][0]["error"]
@pytest.mark.asyncio
async def test_label_preserved_in_pending(self, mock_client):
run_info = MagicMock()
run_info.status = "RUNNING"
mock_client.run.return_value.get.return_value = run_info
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1", "label": "instagram"}]
})
assert result["pending"][0]["label"] == "instagram"
# ---------------------------------------------------------------------------
# apify_collect — succeeded path + external content wrapping
# ---------------------------------------------------------------------------
class TestCollectSucceeded:
@pytest.mark.asyncio
async def test_succeeded_fetches_dataset_and_wraps_content(self, mock_client):
run_info = MagicMock()
run_info.status = "SUCCEEDED"
mock_client.run.return_value.get.return_value = run_info
items = [{"title": "Result 1", "url": "https://example.com"}]
dataset_result = MagicMock()
dataset_result.items = items
mock_client.dataset.return_value.list_items.return_value = dataset_result
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert result["all_done"] is True
assert len(result["completed"]) == 1
c = result["completed"][0]
assert c["status"] == "SUCCEEDED"
assert c["result_count"] == 1
assert "<<<EXTERNAL_UNTRUSTED_CONTENT>>>" in c["data"]
assert "<<<END_EXTERNAL_UNTRUSTED_CONTENT>>>" in c["data"]
assert "Result 1" in c["data"]
from plugins.apify.tools import _COLLECT_DEFAULT_LIMIT
mock_client.dataset.return_value.list_items.assert_called_once_with(limit=_COLLECT_DEFAULT_LIMIT)
mock_client.dataset.assert_called_once_with("d1")
@pytest.mark.asyncio
async def test_dataset_content_truncated_at_50000_chars(self, mock_client):
run_info = MagicMock()
run_info.status = "SUCCEEDED"
mock_client.run.return_value.get.return_value = run_info
# Create items whose JSON serialization exceeds 50k chars
items = [{"data": "x" * 1000} for _ in range(100)]
dataset_result = MagicMock()
dataset_result.items = items
mock_client.dataset.return_value.list_items.return_value = dataset_result
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
raw_data = result["completed"][0]["data"]
# Strip markers to check truncated content length
content = raw_data.replace("<<<EXTERNAL_UNTRUSTED_CONTENT>>>\n", "").replace(
"\n<<<END_EXTERNAL_UNTRUSTED_CONTENT>>>", ""
)
assert "[…truncated]" in content
assert len(content) <= 50_000 + len("\n\n[…truncated]")
@pytest.mark.asyncio
async def test_all_done_true_when_only_succeeded(self, mock_client):
run_info = MagicMock()
run_info.status = "SUCCEEDED"
mock_client.run.return_value.get.return_value = run_info
mock_client.dataset.return_value.list_items.return_value = MagicMock(items=[])
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert result["all_done"] is True
assert result["pending"] == []
@pytest.mark.asyncio
async def test_may_have_more_set_when_result_count_equals_limit(self, mock_client):
from plugins.apify.tools import _collect_handler, _COLLECT_DEFAULT_LIMIT
run_info = MagicMock()
run_info.status = "SUCCEEDED"
mock_client.run.return_value.get.return_value = run_info
items = [{"i": i} for i in range(_COLLECT_DEFAULT_LIMIT)]
mock_client.dataset.return_value.list_items.return_value = MagicMock(items=items)
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert result["completed"][0].get("may_have_more") is True
@pytest.mark.asyncio
async def test_may_have_more_absent_when_under_limit(self, mock_client):
run_info = MagicMock()
run_info.status = "SUCCEEDED"
mock_client.run.return_value.get.return_value = run_info
mock_client.dataset.return_value.list_items.return_value = MagicMock(items=[{"i": 0}])
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert "may_have_more" not in result["completed"][0]
@pytest.mark.asyncio
async def test_custom_limit_forwarded_to_sdk(self, mock_client):
run_info = MagicMock()
run_info.status = "SUCCEEDED"
mock_client.run.return_value.get.return_value = run_info
mock_client.dataset.return_value.list_items.return_value = MagicMock(items=[])
from plugins.apify.tools import _collect_handler
await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}],
"limit": 500,
})
mock_client.dataset.return_value.list_items.assert_called_once_with(limit=500)
# ---------------------------------------------------------------------------
# apify_collect — mixed states, interrupted, and full workflow
# ---------------------------------------------------------------------------
class TestCollectMixed:
@pytest.mark.asyncio
async def test_mixed_pending_and_succeeded(self, mock_client):
run1_info = MagicMock()
run1_info.status = "RUNNING"
run2_info = MagicMock()
run2_info.status = "SUCCEEDED"
def _run_get_side_effect(run_id):
m = MagicMock()
if run_id == "r1":
m.get.return_value = run1_info
else:
m.get.return_value = run2_info
return m
mock_client.run.side_effect = _run_get_side_effect
mock_client.dataset.return_value.list_items.return_value = MagicMock(items=[{"result": 1}])
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [
{"run_id": "r1", "actor_id": "apify~a", "dataset_id": "d1"},
{"run_id": "r2", "actor_id": "apify~b", "dataset_id": "d2"},
]
})
assert result["all_done"] is False
assert len(result["pending"]) == 1
assert len(result["completed"]) == 1
assert result["pending"][0]["run_id"] == "r1"
assert result["completed"][0]["run_id"] == "r2"
@pytest.mark.asyncio
async def test_interrupted_returns_early(self, mock_client, monkeypatch):
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: True)
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert result == {"error": "Interrupted"}
mock_client.run.assert_not_called()
@pytest.mark.asyncio
async def test_api_exception_goes_to_errors(self, mock_client):
mock_client.run.return_value.get.side_effect = RuntimeError("API error")
from plugins.apify.tools import _collect_handler
result = await _collect_handler({
"runs": [{"run_id": "r1", "actor_id": "apify~test", "dataset_id": "d1"}]
})
assert result["all_done"] is True # no pending
assert "API error" in result["errors"][0]["error"]
class TestCollectFullWorkflow:
"""End-to-end: start two runs, collect until all done."""
@pytest.mark.asyncio
async def test_start_then_collect_workflow(self, mock_client):
# apify_start
run1 = MagicMock(id="r1", default_dataset_id="d1", status="QUEUED")
run2 = MagicMock(id="r2", default_dataset_id="d2", status="QUEUED")
mock_client.actor.return_value.start.side_effect = [run1, run2]
from plugins.apify.tools import _start_handler, _collect_handler
start_result = _start_handler({
"runs": [
{"actor_id": "apify~actor-a", "input": {}, "label": "a"},
{"actor_id": "apify~actor-b", "input": {}, "label": "b"},
]
})
assert len(start_result["runs"]) == 2
# First collect: both still running
run_info_running = MagicMock(status="RUNNING")
mock_client.run.return_value.get.return_value = run_info_running
collect1 = await _collect_handler({"runs": start_result["runs"]})
assert collect1["all_done"] is False
assert len(collect1["pending"]) == 2
# Second collect: both succeeded
run_info_done = MagicMock(status="SUCCEEDED")
mock_client.run.return_value.get.return_value = run_info_done
mock_client.dataset.return_value.list_items.return_value = MagicMock(items=[{"row": 1}])
collect2 = await _collect_handler({"runs": start_result["runs"]})
assert collect2["all_done"] is True
assert len(collect2["completed"]) == 2

View File

@@ -90,6 +90,7 @@ LAZY_DEPS: dict[str, tuple[str, ...]] = {
# ─── Web search backends ───────────────────────────────────────────────
"search.exa": ("exa-py==2.10.2",),
"search.firecrawl": ("firecrawl-py==4.17.0",),
"search.apify": ("apify-client==3.0.1",),
"search.parallel": ("parallel-web==0.4.2",),
# ─── TTS providers ─────────────────────────────────────────────────────

View File

@@ -274,6 +274,15 @@ TOOLSETS = {
"includes": [],
},
"apify": {
"description": (
"Apify Actor execution — discover Actors in the Store, "
"start runs, and collect results."
),
"tools": ["apify_discover", "apify_start", "apify_collect"],
"includes": [],
},
"discord": {
"description": "Discord read and participate tools (fetch messages, search members, create threads)",
"tools": ["discord"],