mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-10 20:29:00 +08:00
Compare commits
1 Commits
fix/docker
...
kilocode-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ff7f01375b |
295
tests/tools/test_read_extract.py
Normal file
295
tests/tools/test_read_extract.py
Normal file
@@ -0,0 +1,295 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for structured-document extraction in the read_file tool.
|
||||
|
||||
Covers .ipynb / .docx / .xlsx extraction (ported from Kilo-Org/kilocode
|
||||
#10733, #10737, #10740) and the read_file_tool integration: pagination,
|
||||
line-numbering, graceful fallback on malformed input, and hidden-sheet
|
||||
omission.
|
||||
|
||||
Run with: python -m pytest tests/tools/test_read_extract.py -v
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
import zipfile
|
||||
|
||||
from tools.read_extract import (
|
||||
ExtractionError,
|
||||
extract_document_text,
|
||||
is_extractable_document,
|
||||
)
|
||||
from tools.file_tools import read_file_tool
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixture builders — construct minimal valid OOXML / notebook files.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _write_notebook(path, cells, nbformat=4):
|
||||
nb = {"cells": cells, "metadata": {}, "nbformat": nbformat, "nbformat_minor": 5}
|
||||
with open(path, "w", encoding="utf-8") as fh:
|
||||
json.dump(nb, fh)
|
||||
|
||||
|
||||
def _write_docx(path, document_xml):
|
||||
with zipfile.ZipFile(path, "w") as z:
|
||||
z.writestr("[Content_Types].xml", "<Types/>")
|
||||
z.writestr("word/document.xml", document_xml)
|
||||
|
||||
|
||||
def _write_xlsx(path, *, workbook, rels, shared, sheets):
|
||||
"""sheets: dict of part-name -> xml string."""
|
||||
with zipfile.ZipFile(path, "w") as z:
|
||||
z.writestr("xl/workbook.xml", workbook)
|
||||
z.writestr("xl/_rels/workbook.xml.rels", rels)
|
||||
if shared is not None:
|
||||
z.writestr("xl/sharedStrings.xml", shared)
|
||||
for part, xml in sheets.items():
|
||||
z.writestr(part, xml)
|
||||
|
||||
|
||||
_NS_W = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
||||
_NS_S = "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_extractable_document
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestIsExtractable(unittest.TestCase):
|
||||
def test_recognized_extensions(self):
|
||||
self.assertTrue(is_extractable_document("a.ipynb"))
|
||||
self.assertTrue(is_extractable_document("/x/B.DOCX"))
|
||||
self.assertTrue(is_extractable_document("report.xlsx"))
|
||||
|
||||
def test_unrecognized_extensions(self):
|
||||
self.assertFalse(is_extractable_document("a.py"))
|
||||
self.assertFalse(is_extractable_document("a.pdf"))
|
||||
self.assertFalse(is_extractable_document("a.txt"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Notebooks (.ipynb) — #10733
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestNotebookExtraction(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="rex_nb_")
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def test_markdown_and_code_in_order(self):
|
||||
p = os.path.join(self.tmp, "nb.ipynb")
|
||||
_write_notebook(p, [
|
||||
{"cell_type": "markdown", "source": ["# Title\n", "para"]},
|
||||
{"cell_type": "code", "source": "x = 1\nprint(x)",
|
||||
"outputs": [{"output_type": "stream", "text": ["1\n"]}],
|
||||
"execution_count": 1},
|
||||
])
|
||||
text = extract_document_text(p)
|
||||
self.assertIn("# Title", text)
|
||||
self.assertIn("print(x)", text)
|
||||
# Output payloads must NOT leak into the extracted text.
|
||||
self.assertNotIn("output_type", text)
|
||||
self.assertNotIn("execution_count", text)
|
||||
# Order preserved: markdown before code.
|
||||
self.assertLess(text.index("Title"), text.index("print(x)"))
|
||||
|
||||
def test_string_source_form(self):
|
||||
p = os.path.join(self.tmp, "nb2.ipynb")
|
||||
_write_notebook(p, [{"cell_type": "code", "source": "single string source"}])
|
||||
self.assertIn("single string source", extract_document_text(p))
|
||||
|
||||
def test_legacy_worksheets_form(self):
|
||||
p = os.path.join(self.tmp, "nb3.ipynb")
|
||||
nb = {"worksheets": [{"cells": [
|
||||
{"cell_type": "code", "input": "ignored", "source": "legacy cell"}]}],
|
||||
"nbformat": 3}
|
||||
with open(p, "w") as fh:
|
||||
json.dump(nb, fh)
|
||||
self.assertIn("legacy cell", extract_document_text(p))
|
||||
|
||||
def test_malformed_notebook_raises(self):
|
||||
p = os.path.join(self.tmp, "bad.ipynb")
|
||||
with open(p, "w") as fh:
|
||||
fh.write("{ not valid json")
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
def test_empty_cells_raises(self):
|
||||
p = os.path.join(self.tmp, "empty.ipynb")
|
||||
_write_notebook(p, [])
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Word documents (.docx) — #10737
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDocxExtraction(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="rex_docx_")
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def _doc(self, body):
|
||||
return (f'<?xml version="1.0"?><w:document xmlns:w="{_NS_W}">'
|
||||
f'<w:body>{body}</w:body></w:document>')
|
||||
|
||||
def test_paragraphs_and_runs(self):
|
||||
p = os.path.join(self.tmp, "d.docx")
|
||||
_write_docx(p, self._doc(
|
||||
'<w:p><w:r><w:t>Hello </w:t></w:r><w:r><w:t>World</w:t></w:r></w:p>'
|
||||
'<w:p><w:r><w:t>Second</w:t></w:r></w:p>'))
|
||||
text = extract_document_text(p)
|
||||
self.assertIn("Hello World", text)
|
||||
self.assertIn("Second", text)
|
||||
|
||||
def test_tabs_and_breaks(self):
|
||||
p = os.path.join(self.tmp, "d2.docx")
|
||||
_write_docx(p, self._doc(
|
||||
'<w:p><w:r><w:t>A</w:t><w:tab/><w:t>B</w:t><w:br/><w:t>C</w:t></w:r></w:p>'))
|
||||
text = extract_document_text(p)
|
||||
self.assertIn("A\tB", text)
|
||||
self.assertIn("C", text)
|
||||
|
||||
def test_not_a_zip_raises(self):
|
||||
p = os.path.join(self.tmp, "bad.docx")
|
||||
with open(p, "wb") as fh:
|
||||
fh.write(b"plain bytes, not a zip")
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
def test_missing_document_xml_raises(self):
|
||||
p = os.path.join(self.tmp, "nodoc.docx")
|
||||
with zipfile.ZipFile(p, "w") as z:
|
||||
z.writestr("other.xml", "<x/>")
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Excel workbooks (.xlsx) — #10740
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestXlsxExtraction(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="rex_xlsx_")
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def _build(self, path, *, include_hidden=True):
|
||||
r = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
|
||||
hidden_sheet = (f'<sheet name="Hidden" sheetId="2" state="hidden" '
|
||||
f'xmlns:r="{r}" r:id="rId2"/>') if include_hidden else ""
|
||||
workbook = (
|
||||
f'<workbook xmlns="{_NS_S}" xmlns:r="{r}"><sheets>'
|
||||
f'<sheet name="Data" sheetId="1" r:id="rId1"/>{hidden_sheet}'
|
||||
f'</sheets></workbook>')
|
||||
rels = (
|
||||
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
|
||||
'<Relationship Id="rId1" Target="worksheets/sheet1.xml" Type="x"/>'
|
||||
'<Relationship Id="rId2" Target="worksheets/sheet2.xml" Type="x"/>'
|
||||
'</Relationships>')
|
||||
shared = (f'<sst xmlns="{_NS_S}"><si><t>Name</t></si><si><t>Score</t></si>'
|
||||
f'<si><t>Alice</t></si></sst>')
|
||||
sheet1 = (
|
||||
f'<worksheet xmlns="{_NS_S}"><sheetData>'
|
||||
'<row r="1"><c r="A1" t="s"><v>0</v></c><c r="B1" t="s"><v>1</v></c></row>'
|
||||
'<row r="2"><c r="A2" t="s"><v>2</v></c><c r="B2"><v>95</v></c></row>'
|
||||
'</sheetData></worksheet>')
|
||||
sheet2 = (f'<worksheet xmlns="{_NS_S}"><sheetData>'
|
||||
'<row r="1"><c r="A1" t="str"><v>SECRETDATA</v></c></row>'
|
||||
'</sheetData></worksheet>')
|
||||
_write_xlsx(path, workbook=workbook, rels=rels, shared=shared,
|
||||
sheets={"xl/worksheets/sheet1.xml": sheet1,
|
||||
"xl/worksheets/sheet2.xml": sheet2})
|
||||
|
||||
def test_visible_sheet_content(self):
|
||||
p = os.path.join(self.tmp, "wb.xlsx")
|
||||
self._build(p)
|
||||
text = extract_document_text(p)
|
||||
self.assertIn("Data", text) # sheet label
|
||||
self.assertIn("Name\tScore", text) # shared-string header row
|
||||
self.assertIn("Alice\t95", text) # string + numeric cells
|
||||
|
||||
def test_hidden_sheet_omitted(self):
|
||||
p = os.path.join(self.tmp, "wb2.xlsx")
|
||||
self._build(p)
|
||||
text = extract_document_text(p)
|
||||
self.assertNotIn("SECRETDATA", text)
|
||||
self.assertNotIn("Hidden", text)
|
||||
|
||||
def test_not_a_zip_raises(self):
|
||||
p = os.path.join(self.tmp, "bad.xlsx")
|
||||
with open(p, "wb") as fh:
|
||||
fh.write(b"nope")
|
||||
with self.assertRaises(ExtractionError):
|
||||
extract_document_text(p)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# read_file_tool integration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestReadFileToolIntegration(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="rex_int_")
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def test_notebook_read_is_line_numbered(self):
|
||||
p = os.path.join(self.tmp, "nb.ipynb")
|
||||
_write_notebook(p, [
|
||||
{"cell_type": "markdown", "source": "# H"},
|
||||
{"cell_type": "code", "source": "print(1)"},
|
||||
])
|
||||
res = json.loads(read_file_tool(p))
|
||||
self.assertTrue(res.get("extracted_document"))
|
||||
self.assertIn("1|", res["content"]) # line-number gutter
|
||||
self.assertIn("print(1)", res["content"])
|
||||
|
||||
def test_pagination(self):
|
||||
p = os.path.join(self.tmp, "nb.ipynb")
|
||||
_write_notebook(p, [
|
||||
{"cell_type": "code", "source": "a\nb\nc\nd\ne\nf"},
|
||||
])
|
||||
res = json.loads(read_file_tool(p, offset=1, limit=2))
|
||||
self.assertTrue(res.get("truncated"))
|
||||
self.assertIn("offset=3", res.get("hint", ""))
|
||||
# Only first 2 lines present.
|
||||
self.assertIn("1|# ── Code cell 1 ──", res["content"])
|
||||
|
||||
def test_corrupt_docx_falls_through_to_binary_guard(self):
|
||||
p = os.path.join(self.tmp, "bad.docx")
|
||||
with open(p, "wb") as fh:
|
||||
fh.write(b"not a zip")
|
||||
res = json.loads(read_file_tool(p))
|
||||
# Should NOT crash; falls through to the binary-extension guard.
|
||||
self.assertIn("error", res)
|
||||
self.assertIn("binary", res["error"].lower())
|
||||
|
||||
def test_docx_read_extracts(self):
|
||||
p = os.path.join(self.tmp, "d.docx")
|
||||
_write_docx(p, (f'<?xml version="1.0"?><w:document xmlns:w="{_NS_W}">'
|
||||
'<w:body><w:p><w:r><w:t>Report body</w:t></w:r></w:p>'
|
||||
'</w:body></w:document>'))
|
||||
res = json.loads(read_file_tool(p))
|
||||
self.assertTrue(res.get("extracted_document"))
|
||||
self.assertIn("Report body", res["content"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -7,9 +7,15 @@ import logging
|
||||
import os
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from agent.file_safety import get_read_block_error
|
||||
from tools.binary_extensions import has_binary_extension
|
||||
from tools.read_extract import (
|
||||
ExtractionError,
|
||||
extract_document_text,
|
||||
is_extractable_document,
|
||||
)
|
||||
from tools.file_operations import (
|
||||
ShellFileOperations,
|
||||
normalize_read_pagination,
|
||||
@@ -626,6 +632,87 @@ def clear_file_ops_cache(task_id: str = None):
|
||||
_file_ops_cache.clear()
|
||||
|
||||
|
||||
def _read_extracted_document(
|
||||
resolved_str: str,
|
||||
display_path: str,
|
||||
offset: int,
|
||||
limit: int,
|
||||
task_id: str = "default",
|
||||
) -> Optional[str]:
|
||||
"""Render a structured document (.ipynb/.docx/.xlsx) as a paginated read.
|
||||
|
||||
Extracts the document to plain text, then applies the same pagination,
|
||||
line-numbering, char-limit and redaction semantics as a normal text read so
|
||||
the output is indistinguishable in shape from reading a source file.
|
||||
|
||||
Returns:
|
||||
A JSON string (the tool result) on success, or ``None`` if extraction
|
||||
failed — in which case the caller falls through to the normal read path
|
||||
so the file stays inspectable (raw text or the binary guard).
|
||||
"""
|
||||
try:
|
||||
text = extract_document_text(resolved_str)
|
||||
except ExtractionError:
|
||||
# Malformed/unsupported in practice — let the normal path handle it.
|
||||
return None
|
||||
except Exception:
|
||||
logger.debug("document extraction failed for %s", display_path, exc_info=True)
|
||||
return None
|
||||
|
||||
lines = text.split("\n")
|
||||
# text ends with a trailing newline; drop the resulting empty final element
|
||||
# so total_lines reflects real content lines (matches sed/wc behavior).
|
||||
if lines and lines[-1] == "":
|
||||
lines.pop()
|
||||
total_lines = len(lines)
|
||||
|
||||
start_idx = offset - 1 # offset is 1-indexed
|
||||
end_idx = start_idx + limit
|
||||
page = lines[start_idx:end_idx]
|
||||
page_text = "\n".join(page)
|
||||
|
||||
truncated = total_lines > (start_idx + limit)
|
||||
end_line = start_idx + limit
|
||||
hint = None
|
||||
if truncated:
|
||||
hint = (
|
||||
f"Use offset={end_line + 1} to continue reading "
|
||||
f"(showing {offset}-{min(end_line, total_lines)} of {total_lines} lines)"
|
||||
)
|
||||
|
||||
# Line-number the page using the shared formatter so output matches a
|
||||
# normal read exactly (LINE_NUM|CONTENT, long-line truncation, etc.).
|
||||
file_ops = _get_file_ops(task_id)
|
||||
numbered = file_ops._add_line_numbers(page_text, offset) if page_text else ""
|
||||
|
||||
# Char-count guard — same safety limit as the normal read path.
|
||||
max_chars = _get_max_read_chars()
|
||||
if len(numbered) > max_chars:
|
||||
return json.dumps({
|
||||
"error": (
|
||||
f"Read produced {len(numbered):,} characters which exceeds "
|
||||
f"the safety limit ({max_chars:,} chars). "
|
||||
"Use offset and limit to read a smaller range. "
|
||||
f"The document has {total_lines} lines of extracted text."
|
||||
),
|
||||
"path": display_path,
|
||||
"total_lines": total_lines,
|
||||
}, ensure_ascii=False)
|
||||
|
||||
numbered = redact_sensitive_text(numbered, code_file=True)
|
||||
|
||||
result_dict = {
|
||||
"content": numbered,
|
||||
"total_lines": total_lines,
|
||||
"truncated": truncated,
|
||||
"extracted_document": True,
|
||||
}
|
||||
if hint:
|
||||
result_dict["hint"] = hint
|
||||
|
||||
return json.dumps(result_dict, ensure_ascii=False)
|
||||
|
||||
|
||||
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
|
||||
"""Read a file with pagination and line numbers."""
|
||||
try:
|
||||
@@ -644,6 +731,23 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
|
||||
|
||||
_resolved = _resolve_path_for_task(path, task_id)
|
||||
|
||||
# ── Structured-document extraction ────────────────────────────
|
||||
# .ipynb / .docx / .xlsx render to plain text in-process so the
|
||||
# agent can read their content directly. Raw .ipynb JSON drowns
|
||||
# the model in metadata + output payloads, and .docx/.xlsx are
|
||||
# otherwise rejected as binary. Extracted text flows through the
|
||||
# same pagination / line-numbering / char-limit / redaction
|
||||
# pipeline as a normal text read. Malformed documents fall back to
|
||||
# the normal read path (raw text / binary guard) so they stay
|
||||
# inspectable. Ported from Kilo-Org/kilocode #10733, #10737, #10740.
|
||||
if is_extractable_document(str(_resolved)):
|
||||
_doc_result = _read_extracted_document(
|
||||
str(_resolved), path, offset, limit, task_id
|
||||
)
|
||||
if _doc_result is not None:
|
||||
return _doc_result
|
||||
# else: extraction failed → fall through to normal read path.
|
||||
|
||||
# ── Binary file guard ─────────────────────────────────────────
|
||||
# Block binary files by extension (no I/O).
|
||||
if has_binary_extension(str(_resolved)):
|
||||
@@ -1311,7 +1415,7 @@ def _check_file_reqs():
|
||||
|
||||
READ_FILE_SCHEMA = {
|
||||
"name": "read_file",
|
||||
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
|
||||
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. Jupyter notebooks (.ipynb), Word documents (.docx), and Excel workbooks (.xlsx) are auto-extracted to readable text. NOTE: Cannot read images or other binary files — use vision_analyze for images.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
409
tools/read_extract.py
Normal file
409
tools/read_extract.py
Normal file
@@ -0,0 +1,409 @@
|
||||
"""Document text extraction for the read_file tool.
|
||||
|
||||
Ported/adapted from Kilo-Org/kilocode PRs #10733 (notebooks), #10737 (DOCX),
|
||||
and #10740 (XLSX), which added structured-document reading to their CLI `read`
|
||||
tool. Kilo bundled the `mammoth` JS library for DOCX; hermes-agent instead uses
|
||||
a pure-stdlib approach (``json`` + ``zipfile`` + ``xml.etree``) so no new Python
|
||||
dependency is added — ``.docx`` and ``.xlsx`` are both Zip+OOXML containers that
|
||||
stdlib can unpack and parse.
|
||||
|
||||
The router (:func:`extract_document_text`) returns a plain-text rendering of the
|
||||
document. The caller (``read_file_tool``) then routes that text through the
|
||||
existing line-numbering, pagination, truncation, char-limit and redaction
|
||||
pipeline — exactly as it does for a normal text file. That keeps a single set of
|
||||
output semantics for every readable format.
|
||||
|
||||
Design constraints (from the hermes-agent-dev skill):
|
||||
* No new hard dependency. Everything here is stdlib.
|
||||
* Extraction reads local bytes directly (works regardless of terminal
|
||||
backend, since the file is resolved to a host path before we get here).
|
||||
* Malformed inputs degrade gracefully: callers fall back to raw-text reading
|
||||
so the file stays inspectable rather than throwing an opaque error.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import zipfile
|
||||
from typing import Optional
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
__all__ = [
|
||||
"EXTRACTABLE_EXTENSIONS",
|
||||
"is_extractable_document",
|
||||
"extract_document_text",
|
||||
"ExtractionError",
|
||||
]
|
||||
|
||||
# Extensions we can render to text in-process. Lowercase, leading dot.
|
||||
EXTRACTABLE_EXTENSIONS = frozenset({".ipynb", ".docx", ".xlsx"})
|
||||
|
||||
# Workbook hard cap mirrors Kilo #10740 (reject >50 MB before parsing). Applied
|
||||
# by the caller via file size; re-stated here as the documented contract.
|
||||
MAX_XLSX_BYTES = 50 * 1024 * 1024
|
||||
|
||||
# Bound worksheet extraction so a pathological workbook can't blow up context
|
||||
# before the read tool's own char-limit guard runs. Generous — the read tool
|
||||
# truncates afterward anyway.
|
||||
_MAX_XLSX_ROWS_PER_SHEET = 5000
|
||||
_MAX_XLSX_COLS = 256
|
||||
|
||||
# OOXML namespaces.
|
||||
_NS_W = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
||||
_NS_S = "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
|
||||
|
||||
|
||||
class ExtractionError(Exception):
|
||||
"""Raised when a document can't be extracted to text.
|
||||
|
||||
The caller treats this as a signal to fall back to raw-text reading so the
|
||||
file remains inspectable (matching Kilo's malformed-notebook behavior).
|
||||
"""
|
||||
|
||||
|
||||
def is_extractable_document(path: str) -> bool:
|
||||
"""True if ``path`` has an extension we can render to text."""
|
||||
lower = path.lower()
|
||||
return any(lower.endswith(ext) for ext in EXTRACTABLE_EXTENSIONS)
|
||||
|
||||
|
||||
def _ext_of(path: str) -> str:
|
||||
lower = path.lower()
|
||||
for ext in EXTRACTABLE_EXTENSIONS:
|
||||
if lower.endswith(ext):
|
||||
return ext
|
||||
return ""
|
||||
|
||||
|
||||
def extract_document_text(path: str) -> str:
|
||||
"""Render a supported document to plain text.
|
||||
|
||||
Args:
|
||||
path: Local filesystem path to a ``.ipynb`` / ``.docx`` / ``.xlsx`` file.
|
||||
|
||||
Returns:
|
||||
Plain-text rendering suitable for line-numbered display.
|
||||
|
||||
Raises:
|
||||
ExtractionError: if the file is malformed or can't be parsed. The caller
|
||||
should fall back to raw-text reading.
|
||||
"""
|
||||
ext = _ext_of(path)
|
||||
if ext == ".ipynb":
|
||||
return _extract_notebook(path)
|
||||
if ext == ".docx":
|
||||
return _extract_docx(path)
|
||||
if ext == ".xlsx":
|
||||
return _extract_xlsx(path)
|
||||
raise ExtractionError(f"Unsupported document type: {ext or path!r}")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
# Jupyter notebooks (.ipynb) — Kilo #10733
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _extract_notebook(path: str) -> str:
|
||||
"""Extract markdown + code cell sources in document order.
|
||||
|
||||
Raw ``.ipynb`` JSON drowns the model in metadata and output payloads
|
||||
(base64 images, execution counts, stream noise). We keep only the cell
|
||||
sources, labelled by type, so the agent sees the actual document.
|
||||
"""
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8", errors="replace") as fh:
|
||||
nb = json.load(fh)
|
||||
except (json.JSONDecodeError, OSError, ValueError) as exc:
|
||||
raise ExtractionError(f"Not a valid notebook: {exc}") from exc
|
||||
|
||||
if not isinstance(nb, dict):
|
||||
raise ExtractionError("Notebook root is not a JSON object")
|
||||
|
||||
cells = nb.get("cells")
|
||||
if not isinstance(cells, list):
|
||||
# nbformat < 4 stored cells under worksheets[].cells.
|
||||
worksheets = nb.get("worksheets")
|
||||
if isinstance(worksheets, list) and worksheets:
|
||||
cells = []
|
||||
for ws in worksheets:
|
||||
if isinstance(ws, dict) and isinstance(ws.get("cells"), list):
|
||||
cells.extend(ws["cells"])
|
||||
else:
|
||||
raise ExtractionError("Notebook has no cells array")
|
||||
|
||||
parts: list[str] = []
|
||||
code_n = 0
|
||||
md_n = 0
|
||||
for idx, cell in enumerate(cells):
|
||||
if not isinstance(cell, dict):
|
||||
continue
|
||||
cell_type = cell.get("cell_type", "")
|
||||
source = _join_source(cell.get("source", ""))
|
||||
if cell_type == "markdown":
|
||||
md_n += 1
|
||||
parts.append(f"# ── Markdown cell {md_n} ──")
|
||||
parts.append(source.rstrip("\n"))
|
||||
parts.append("")
|
||||
elif cell_type == "code":
|
||||
code_n += 1
|
||||
parts.append(f"# ── Code cell {code_n} ──")
|
||||
parts.append(source.rstrip("\n"))
|
||||
parts.append("")
|
||||
elif cell_type == "raw":
|
||||
parts.append("# ── Raw cell ──")
|
||||
parts.append(source.rstrip("\n"))
|
||||
parts.append("")
|
||||
# Unknown cell types are skipped silently.
|
||||
|
||||
if not parts:
|
||||
raise ExtractionError("Notebook contains no readable cells")
|
||||
|
||||
text = "\n".join(parts).rstrip("\n") + "\n"
|
||||
return text
|
||||
|
||||
|
||||
def _join_source(source) -> str:
|
||||
"""Notebook ``source`` is either a string or a list of line strings."""
|
||||
if isinstance(source, list):
|
||||
return "".join(s for s in source if isinstance(s, str))
|
||||
if isinstance(source, str):
|
||||
return source
|
||||
return ""
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
# Word documents (.docx) — Kilo #10737 (stdlib instead of mammoth)
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _extract_docx(path: str) -> str:
|
||||
"""Extract paragraph text from a DOCX in document order.
|
||||
|
||||
A ``.docx`` is a Zip container; the body text lives in
|
||||
``word/document.xml`` as ``<w:p>`` paragraphs containing ``<w:t>`` text
|
||||
runs. We walk paragraphs in order, join their runs, and emit one line per
|
||||
paragraph. ``<w:tab>`` becomes a tab and ``<w:br>``/``<w:cr>`` become
|
||||
newlines so basic layout survives.
|
||||
"""
|
||||
try:
|
||||
with zipfile.ZipFile(path) as zf:
|
||||
try:
|
||||
xml_bytes = zf.read("word/document.xml")
|
||||
except KeyError as exc:
|
||||
raise ExtractionError("DOCX missing word/document.xml") from exc
|
||||
except (zipfile.BadZipFile, OSError) as exc:
|
||||
raise ExtractionError(f"Not a valid DOCX (zip) file: {exc}") from exc
|
||||
|
||||
try:
|
||||
root = ET.fromstring(xml_bytes)
|
||||
except ET.ParseError as exc:
|
||||
raise ExtractionError(f"DOCX document.xml is malformed: {exc}") from exc
|
||||
|
||||
w = "{%s}" % _NS_W
|
||||
lines: list[str] = []
|
||||
|
||||
# Iterate paragraphs in document order. Nested paragraphs (e.g. inside
|
||||
# tables) are flattened, which is acceptable for text extraction.
|
||||
for para in root.iter(f"{w}p"):
|
||||
buf: list[str] = []
|
||||
for node in para.iter():
|
||||
tag = node.tag
|
||||
if tag == f"{w}t":
|
||||
buf.append(node.text or "")
|
||||
elif tag == f"{w}tab":
|
||||
buf.append("\t")
|
||||
elif tag in (f"{w}br", f"{w}cr"):
|
||||
buf.append("\n")
|
||||
para_text = "".join(buf)
|
||||
# A paragraph may itself contain explicit line breaks.
|
||||
lines.extend(para_text.split("\n"))
|
||||
|
||||
if not any(line.strip() for line in lines):
|
||||
raise ExtractionError("DOCX contains no extractable text")
|
||||
|
||||
return "\n".join(lines).rstrip("\n") + "\n"
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
# Excel workbooks (.xlsx) — Kilo #10740 (stdlib instead of a parser lib)
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _extract_xlsx(path: str) -> str:
|
||||
"""Extract visible worksheets as labelled tab-separated text.
|
||||
|
||||
An ``.xlsx`` is a Zip of OOXML parts:
|
||||
* ``xl/workbook.xml`` — sheet names + visibility + rId mapping
|
||||
* ``xl/_rels/workbook.xml.rels`` — rId → worksheet part path
|
||||
* ``xl/sharedStrings.xml`` — interned string table
|
||||
* ``xl/worksheets/sheetN.xml``— cell data (values reference shared strings)
|
||||
|
||||
Hidden sheets are omitted. Cells are rendered as their formatted value;
|
||||
string cells dereference the shared-string table. Rows/cols are bounded.
|
||||
"""
|
||||
try:
|
||||
zf = zipfile.ZipFile(path)
|
||||
except (zipfile.BadZipFile, OSError) as exc:
|
||||
raise ExtractionError(f"Not a valid XLSX (zip) file: {exc}") from exc
|
||||
|
||||
with zf:
|
||||
names = set(zf.namelist())
|
||||
if "xl/workbook.xml" not in names:
|
||||
raise ExtractionError("XLSX missing xl/workbook.xml")
|
||||
|
||||
shared = _read_shared_strings(zf, names)
|
||||
sheets = _read_workbook_sheets(zf)
|
||||
rels = _read_workbook_rels(zf, names)
|
||||
|
||||
out: list[str] = []
|
||||
for sheet in sheets:
|
||||
if sheet["state"] in ("hidden", "veryHidden"):
|
||||
continue
|
||||
target = rels.get(sheet["rid"])
|
||||
if not target:
|
||||
# Fallback: positional guess (xl/worksheets/sheetN.xml).
|
||||
continue
|
||||
part = _normalize_sheet_target(target)
|
||||
if part not in names:
|
||||
continue
|
||||
try:
|
||||
rows = _read_sheet_rows(zf.read(part), shared)
|
||||
except ET.ParseError:
|
||||
continue
|
||||
out.append(f"# ── Sheet: {sheet['name']} ──")
|
||||
if rows:
|
||||
out.extend("\t".join(r) for r in rows)
|
||||
else:
|
||||
out.append("(empty)")
|
||||
out.append("")
|
||||
|
||||
if not out:
|
||||
raise ExtractionError("XLSX has no visible sheets with content")
|
||||
|
||||
return "\n".join(out).rstrip("\n") + "\n"
|
||||
|
||||
|
||||
def _read_shared_strings(zf: zipfile.ZipFile, names: set) -> list:
|
||||
if "xl/sharedStrings.xml" not in names:
|
||||
return []
|
||||
try:
|
||||
root = ET.fromstring(zf.read("xl/sharedStrings.xml"))
|
||||
except ET.ParseError:
|
||||
return []
|
||||
s = "{%s}" % _NS_S
|
||||
result: list[str] = []
|
||||
for si in root.iter(f"{s}si"):
|
||||
# A shared string item is either a single <t> or rich-text runs <r><t>.
|
||||
texts = [t.text or "" for t in si.iter(f"{s}t")]
|
||||
result.append("".join(texts))
|
||||
return result
|
||||
|
||||
|
||||
def _read_workbook_sheets(zf: zipfile.ZipFile) -> list:
|
||||
root = ET.fromstring(zf.read("xl/workbook.xml"))
|
||||
s = "{%s}" % _NS_S
|
||||
r = "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}"
|
||||
sheets = []
|
||||
for sheet in root.iter(f"{s}sheet"):
|
||||
sheets.append({
|
||||
"name": sheet.get("name", "Sheet"),
|
||||
"state": sheet.get("state", "visible"),
|
||||
"rid": sheet.get(f"{r}id", ""),
|
||||
})
|
||||
return sheets
|
||||
|
||||
|
||||
def _read_workbook_rels(zf: zipfile.ZipFile, names: set) -> dict:
|
||||
rels_path = "xl/_rels/workbook.xml.rels"
|
||||
if rels_path not in names:
|
||||
return {}
|
||||
try:
|
||||
root = ET.fromstring(zf.read(rels_path))
|
||||
except ET.ParseError:
|
||||
return {}
|
||||
pr = "{http://schemas.openxmlformats.org/package/2006/relationships}"
|
||||
mapping = {}
|
||||
for rel in root.iter(f"{pr}Relationship"):
|
||||
rid = rel.get("Id", "")
|
||||
target = rel.get("Target", "")
|
||||
if rid and target:
|
||||
mapping[rid] = target
|
||||
return mapping
|
||||
|
||||
|
||||
def _normalize_sheet_target(target: str) -> str:
|
||||
"""Workbook rels target is relative to ``xl/`` (e.g. ``worksheets/sheet1.xml``)."""
|
||||
target = target.lstrip("/")
|
||||
if target.startswith("xl/"):
|
||||
return target
|
||||
return "xl/" + target
|
||||
|
||||
|
||||
def _col_index(cell_ref: str) -> int:
|
||||
"""Convert an A1-style cell ref's column letters to a 0-based index."""
|
||||
letters = "".join(ch for ch in cell_ref if ch.isalpha())
|
||||
idx = 0
|
||||
for ch in letters:
|
||||
idx = idx * 26 + (ord(ch.upper()) - ord("A") + 1)
|
||||
return idx - 1 if idx > 0 else 0
|
||||
|
||||
|
||||
def _read_sheet_rows(xml_bytes: bytes, shared: list) -> list:
|
||||
root = ET.fromstring(xml_bytes)
|
||||
s = "{%s}" % _NS_S
|
||||
rows_out: list[list[str]] = []
|
||||
row_count = 0
|
||||
for row in root.iter(f"{s}row"):
|
||||
if row_count >= _MAX_XLSX_ROWS_PER_SHEET:
|
||||
break
|
||||
row_count += 1
|
||||
cells: dict[int, str] = {}
|
||||
max_col = -1
|
||||
for c in row.iter(f"{s}c"):
|
||||
ref = c.get("r", "")
|
||||
col = _col_index(ref) if ref else (max_col + 1)
|
||||
if col >= _MAX_XLSX_COLS:
|
||||
continue
|
||||
value = _cell_value(c, shared, s)
|
||||
cells[col] = value
|
||||
if col > max_col:
|
||||
max_col = col
|
||||
if max_col < 0:
|
||||
rows_out.append([])
|
||||
continue
|
||||
rows_out.append([cells.get(i, "") for i in range(max_col + 1)])
|
||||
# Trim trailing fully-empty rows.
|
||||
while rows_out and not any(cell.strip() for cell in rows_out[-1]):
|
||||
rows_out.pop()
|
||||
return rows_out
|
||||
|
||||
|
||||
def _cell_value(c, shared: list, s: str) -> str:
|
||||
cell_type = c.get("t", "")
|
||||
v = c.find(f"{s}v")
|
||||
if cell_type == "s":
|
||||
# Shared-string index.
|
||||
if v is not None and v.text is not None:
|
||||
try:
|
||||
idx = int(v.text)
|
||||
if 0 <= idx < len(shared):
|
||||
return shared[idx]
|
||||
except ValueError:
|
||||
pass
|
||||
return ""
|
||||
if cell_type == "inlineStr":
|
||||
is_node = c.find(f"{s}is")
|
||||
if is_node is not None:
|
||||
return "".join(t.text or "" for t in is_node.iter(f"{s}t"))
|
||||
return ""
|
||||
if cell_type == "str":
|
||||
# Formula result string.
|
||||
return v.text if (v is not None and v.text is not None) else ""
|
||||
if cell_type == "b":
|
||||
if v is not None and v.text is not None:
|
||||
return "TRUE" if v.text.strip() in ("1", "true", "TRUE") else "FALSE"
|
||||
return ""
|
||||
if cell_type == "e":
|
||||
# Error value (e.g. #DIV/0!).
|
||||
return v.text if (v is not None and v.text is not None) else "#ERROR"
|
||||
# Numeric / general.
|
||||
if v is not None and v.text is not None:
|
||||
return v.text
|
||||
return ""
|
||||
Reference in New Issue
Block a user