Files
hermes-agent/tests/tools/test_video_analyze.py

338 lines
13 KiB
Python
Raw Normal View History

feat: add video_analyze tool for native video understanding (#19301) * feat: add video_analyze tool for native video understanding Adds a video_analyze tool that sends video files to multimodal LLMs (e.g. Gemini) for analysis via the OpenRouter-compatible video_url content type. Mirrors vision_analyze in structure, error handling, and registration pattern. Key design: - Base64 encodes entire video (no frame extraction, no ffmpeg dep) - Uses 'video_url' content block type (OpenRouter standard) - Supports mp4, webm, mov, avi, mkv, mpeg formats - 50 MB hard cap, 20 MB warning threshold - 180s minimum timeout (videos take longer than images) - AUXILIARY_VIDEO_MODEL env override, falls back to AUXILIARY_VISION_MODEL - Same SSRF protection, retry logic, and cleanup as vision_analyze Default disabled: registered in 'video' toolset (not in _HERMES_CORE_TOOLS). Users opt in via: hermes tools enable video, or enabled_toolsets=['video']. * feat(video): add models.dev capability pre-check + CONFIGURABLE_TOOLSETS entry - Pre-checks model video capability via models.dev modalities.input before expensive base64 encoding. Fails early with helpful message suggesting video-capable alternatives (gemini, mimo-v2.5-pro). - Passes optimistically if model unknown or lookup fails. - Adds ModelInfo.supports_video_input() helper. - Adds 'video' to CONFIGURABLE_TOOLSETS and _DEFAULT_OFF_TOOLSETS so 'hermes tools enable video' works from CLI. - 8 new tests for the capability check (37 total). * refactor(video): remove models.dev capability pre-check Removes _check_video_model_capability and ModelInfo.supports_video_input. The vision_analyze tool doesn't pre-check image capability either — both tools rely on the same pattern: send request, handle API errors gracefully with categorized user-facing messages. The pre-check was inconsistent (only worked for some providers/models) so drop it for parity. * cleanup: compress comments, fix fragile timeout coupling - Replace _VISION_DOWNLOAD_TIMEOUT * 2 with hardcoded 60s (no silent breakage if vision timeout changes independently) - Strip verbose comments and redundant log lines throughout - No behavioral changes
2026-05-04 00:04:36 +05:30
"""Tests for video_analyze tool in tools/vision_tools.py."""
import asyncio
import json
import os
from pathlib import Path
from typing import Awaitable
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from tools.vision_tools import (
_detect_video_mime_type,
_video_to_base64_data_url,
_handle_video_analyze,
_MAX_VIDEO_BASE64_BYTES,
_VIDEO_MIME_TYPES,
_VIDEO_SIZE_WARN_BYTES,
video_analyze_tool,
VIDEO_ANALYZE_SCHEMA,
)
# ---------------------------------------------------------------------------
# _detect_video_mime_type
# ---------------------------------------------------------------------------
class TestDetectVideoMimeType:
"""Extension-based MIME detection for video files."""
def test_mp4(self, tmp_path):
p = tmp_path / "clip.mp4"
p.write_bytes(b"\x00" * 10)
assert _detect_video_mime_type(p) == "video/mp4"
def test_webm(self, tmp_path):
p = tmp_path / "clip.webm"
p.write_bytes(b"\x00" * 10)
assert _detect_video_mime_type(p) == "video/webm"
def test_mov(self, tmp_path):
p = tmp_path / "clip.mov"
p.write_bytes(b"\x00" * 10)
assert _detect_video_mime_type(p) == "video/mov"
def test_avi_fallback_mp4(self, tmp_path):
p = tmp_path / "clip.avi"
p.write_bytes(b"\x00" * 10)
assert _detect_video_mime_type(p) == "video/mp4"
def test_mkv_fallback_mp4(self, tmp_path):
p = tmp_path / "clip.mkv"
p.write_bytes(b"\x00" * 10)
assert _detect_video_mime_type(p) == "video/mp4"
def test_mpeg(self, tmp_path):
p = tmp_path / "clip.mpeg"
p.write_bytes(b"\x00" * 10)
assert _detect_video_mime_type(p) == "video/mpeg"
def test_mpg(self, tmp_path):
p = tmp_path / "clip.mpg"
p.write_bytes(b"\x00" * 10)
assert _detect_video_mime_type(p) == "video/mpeg"
def test_unsupported_extension(self, tmp_path):
p = tmp_path / "clip.flv"
p.write_bytes(b"\x00" * 10)
assert _detect_video_mime_type(p) is None
def test_case_insensitive(self, tmp_path):
p = tmp_path / "clip.MP4"
p.write_bytes(b"\x00" * 10)
assert _detect_video_mime_type(p) == "video/mp4"
# ---------------------------------------------------------------------------
# _video_to_base64_data_url
# ---------------------------------------------------------------------------
class TestVideoToBase64DataUrl:
"""Base64 encoding of video files."""
def test_produces_data_url(self, tmp_path):
p = tmp_path / "test.mp4"
p.write_bytes(b"\x00\x01\x02\x03")
result = _video_to_base64_data_url(p)
assert result.startswith("data:video/mp4;base64,")
def test_custom_mime_type(self, tmp_path):
p = tmp_path / "test.webm"
p.write_bytes(b"\x00\x01\x02\x03")
result = _video_to_base64_data_url(p, mime_type="video/webm")
assert result.startswith("data:video/webm;base64,")
def test_default_mime_for_unknown_ext(self, tmp_path):
p = tmp_path / "test.xyz"
p.write_bytes(b"\x00\x01\x02\x03")
result = _video_to_base64_data_url(p)
# Falls back to video/mp4
assert result.startswith("data:video/mp4;base64,")
# ---------------------------------------------------------------------------
# Schema validation
# ---------------------------------------------------------------------------
class TestVideoAnalyzeSchema:
"""Schema structure is correct."""
def test_schema_name(self):
assert VIDEO_ANALYZE_SCHEMA["name"] == "video_analyze"
def test_schema_has_required_fields(self):
params = VIDEO_ANALYZE_SCHEMA["parameters"]
assert "video_url" in params["properties"]
assert "question" in params["properties"]
assert params["required"] == ["video_url", "question"]
def test_schema_description_mentions_video(self):
assert "video" in VIDEO_ANALYZE_SCHEMA["description"].lower()
# ---------------------------------------------------------------------------
# _handle_video_analyze handler
# ---------------------------------------------------------------------------
class TestHandleVideoAnalyze:
"""Tests for the registry handler wrapper."""
def test_returns_awaitable(self, tmp_path, monkeypatch):
video_file = tmp_path / "test.mp4"
video_file.write_bytes(b"\x00" * 100)
monkeypatch.setenv("AUXILIARY_VIDEO_MODEL", "")
monkeypatch.setenv("AUXILIARY_VISION_MODEL", "")
with patch("tools.vision_tools.video_analyze_tool", new_callable=AsyncMock) as mock_tool:
mock_tool.return_value = json.dumps({"success": True, "analysis": "test"})
result = _handle_video_analyze({"video_url": str(video_file), "question": "what is this?"})
# Should return an awaitable (coroutine)
assert asyncio.iscoroutine(result)
# Clean up the unawaited coroutine
result.close()
def test_uses_auxiliary_video_model_env(self, tmp_path, monkeypatch):
monkeypatch.setenv("AUXILIARY_VIDEO_MODEL", "google/gemini-2.5-flash")
monkeypatch.setenv("AUXILIARY_VISION_MODEL", "other-model")
with patch("tools.vision_tools.video_analyze_tool", new_callable=AsyncMock) as mock_tool:
mock_tool.return_value = json.dumps({"success": True, "analysis": "ok"})
asyncio.get_event_loop().run_until_complete(
_handle_video_analyze({"video_url": "/tmp/test.mp4", "question": "test"})
)
args = mock_tool.call_args[0]
assert args[2] == "google/gemini-2.5-flash"
def test_falls_back_to_vision_model_env(self, tmp_path, monkeypatch):
monkeypatch.setenv("AUXILIARY_VIDEO_MODEL", "")
monkeypatch.setenv("AUXILIARY_VISION_MODEL", "google/gemini-flash")
with patch("tools.vision_tools.video_analyze_tool", new_callable=AsyncMock) as mock_tool:
mock_tool.return_value = json.dumps({"success": True, "analysis": "ok"})
asyncio.get_event_loop().run_until_complete(
_handle_video_analyze({"video_url": "/tmp/test.mp4", "question": "test"})
)
args = mock_tool.call_args[0]
assert args[2] == "google/gemini-flash"
# ---------------------------------------------------------------------------
# video_analyze_tool — integration-style tests with mocked LLM
# ---------------------------------------------------------------------------
class TestVideoAnalyzeTool:
"""Core video analysis function tests."""
def _run(self, coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_local_file_success(self, tmp_path, monkeypatch):
"""Analyze a local video file — happy path."""
video = tmp_path / "demo.mp4"
video.write_bytes(b"\x00" * 1024)
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "A short video showing a demo."
with patch("tools.vision_tools.async_call_llm", new_callable=AsyncMock, return_value=mock_response):
with patch("tools.vision_tools.extract_content_or_reasoning", return_value="A short video showing a demo."):
result = self._run(video_analyze_tool(str(video), "What is this?"))
data = json.loads(result)
assert data["success"] is True
assert "demo" in data["analysis"].lower()
def test_local_file_not_found(self, tmp_path):
"""Non-existent file raises appropriate error."""
result = self._run(video_analyze_tool("/nonexistent/video.mp4", "What?"))
data = json.loads(result)
assert data["success"] is False
assert "invalid video source" in data["analysis"].lower()
def test_unsupported_format(self, tmp_path):
"""Unsupported extension raises error."""
video = tmp_path / "clip.flv"
video.write_bytes(b"\x00" * 100)
result = self._run(video_analyze_tool(str(video), "What is this?"))
data = json.loads(result)
assert data["success"] is False
assert "unsupported video format" in data["analysis"].lower()
def test_video_too_large(self, tmp_path, monkeypatch):
"""Video exceeding max size is rejected."""
video = tmp_path / "huge.mp4"
# Don't actually write 50MB — mock the stat
video.write_bytes(b"\x00" * 100)
# Patch the base64 encoding to return something huge
with patch("tools.vision_tools._video_to_base64_data_url") as mock_encode:
mock_encode.return_value = "data:video/mp4;base64," + "A" * (_MAX_VIDEO_BASE64_BYTES + 1)
result = self._run(video_analyze_tool(str(video), "What?"))
data = json.loads(result)
assert data["success"] is False
assert "too large" in data["analysis"].lower()
def test_interrupt_check(self, tmp_path):
"""Tool respects interrupt flag."""
video = tmp_path / "test.mp4"
video.write_bytes(b"\x00" * 100)
with patch("tools.interrupt.is_interrupted", return_value=True):
result = self._run(video_analyze_tool(str(video), "What?"))
data = json.loads(result)
assert data["success"] is False
def test_empty_response_retries(self, tmp_path):
"""Retries once on empty model response."""
video = tmp_path / "test.mp4"
video.write_bytes(b"\x00" * 100)
call_count = 0
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "Video analysis result."
async def fake_llm(**kwargs):
nonlocal call_count
call_count += 1
return mock_response
with patch("tools.vision_tools.async_call_llm", side_effect=fake_llm):
with patch("tools.vision_tools.extract_content_or_reasoning", side_effect=["", "Video analysis result."]):
result = self._run(video_analyze_tool(str(video), "What?"))
data = json.loads(result)
assert data["success"] is True
assert call_count == 2 # Initial call + retry
def test_file_scheme_stripped(self, tmp_path):
"""file:// prefix is stripped correctly."""
video = tmp_path / "test.mp4"
video.write_bytes(b"\x00" * 100)
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "OK"
with patch("tools.vision_tools.async_call_llm", new_callable=AsyncMock, return_value=mock_response):
with patch("tools.vision_tools.extract_content_or_reasoning", return_value="OK"):
result = self._run(video_analyze_tool(f"file://{video}", "What?"))
data = json.loads(result)
assert data["success"] is True
def test_api_message_format(self, tmp_path):
"""Verify the message sent to LLM uses video_url content type."""
video = tmp_path / "test.mp4"
video.write_bytes(b"\x00" * 100)
captured_kwargs = {}
async def capture_llm(**kwargs):
captured_kwargs.update(kwargs)
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "OK"
return mock_response
with patch("tools.vision_tools.async_call_llm", side_effect=capture_llm):
with patch("tools.vision_tools.extract_content_or_reasoning", return_value="OK"):
self._run(video_analyze_tool(str(video), "Describe this"))
messages = captured_kwargs["messages"]
assert len(messages) == 1
content = messages[0]["content"]
assert len(content) == 2
assert content[0]["type"] == "text"
assert content[1]["type"] == "video_url"
assert "video_url" in content[1]
assert content[1]["video_url"]["url"].startswith("data:video/mp4;base64,")
# ---------------------------------------------------------------------------
# Toolset registration
# ---------------------------------------------------------------------------
class TestVideoToolsetRegistration:
"""Verify the tool is registered correctly."""
def test_registered_in_video_toolset(self):
from tools.registry import registry
entry = registry.get_entry("video_analyze")
assert entry is not None
assert entry.toolset == "video"
assert entry.is_async is True
assert entry.emoji == "🎬"
def test_not_in_core_tools(self):
"""video_analyze should NOT be in _HERMES_CORE_TOOLS (default disabled)."""
from toolsets import _HERMES_CORE_TOOLS
assert "video_analyze" not in _HERMES_CORE_TOOLS
def test_in_video_toolset_definition(self):
"""Toolset 'video' should contain video_analyze."""
from toolsets import TOOLSETS
assert "video" in TOOLSETS
assert "video_analyze" in TOOLSETS["video"]["tools"]