Compare commits

...

1 Commits

Author SHA1 Message Date
teknium1
ff7f01375b feat(read): extract .ipynb/.docx/.xlsx to text in read_file
Port from Kilo-Org/kilocode #10733, #10737, #10740: structured-document
reading in the read tool.

read_file now renders Jupyter notebooks, Word documents, and Excel
workbooks to plain text instead of rejecting them as binary (.docx/.xlsx)
or dumping raw JSON with output payloads (.ipynb). Extracted text flows
through the existing pagination, line-numbering, char-limit and redaction
pipeline, so output is identical in shape to a normal text read.

Unlike Kilo (which bundles the mammoth JS lib for DOCX), this uses a
pure-stdlib approach -- .docx and .xlsx are Zip+OOXML containers that
zipfile + xml.etree unpack, and .ipynb is JSON. No new dependency.

- tools/read_extract.py: stdlib extractors + extract_document_text router
- tools/file_tools.py: intercept extractable docs before the binary guard;
  malformed files fall through to the normal read path (stay inspectable)
- tests/tools/test_read_extract.py: 18 tests (extraction + integration)
2026-06-01 17:07:11 -07:00
3 changed files with 809 additions and 1 deletions

View File

@@ -0,0 +1,295 @@
#!/usr/bin/env python3
"""
Tests for structured-document extraction in the read_file tool.
Covers .ipynb / .docx / .xlsx extraction (ported from Kilo-Org/kilocode
#10733, #10737, #10740) and the read_file_tool integration: pagination,
line-numbering, graceful fallback on malformed input, and hidden-sheet
omission.
Run with: python -m pytest tests/tools/test_read_extract.py -v
"""
import json
import os
import tempfile
import unittest
import zipfile
from tools.read_extract import (
ExtractionError,
extract_document_text,
is_extractable_document,
)
from tools.file_tools import read_file_tool
# ---------------------------------------------------------------------------
# Fixture builders — construct minimal valid OOXML / notebook files.
# ---------------------------------------------------------------------------
def _write_notebook(path, cells, nbformat=4):
nb = {"cells": cells, "metadata": {}, "nbformat": nbformat, "nbformat_minor": 5}
with open(path, "w", encoding="utf-8") as fh:
json.dump(nb, fh)
def _write_docx(path, document_xml):
with zipfile.ZipFile(path, "w") as z:
z.writestr("[Content_Types].xml", "<Types/>")
z.writestr("word/document.xml", document_xml)
def _write_xlsx(path, *, workbook, rels, shared, sheets):
"""sheets: dict of part-name -> xml string."""
with zipfile.ZipFile(path, "w") as z:
z.writestr("xl/workbook.xml", workbook)
z.writestr("xl/_rels/workbook.xml.rels", rels)
if shared is not None:
z.writestr("xl/sharedStrings.xml", shared)
for part, xml in sheets.items():
z.writestr(part, xml)
_NS_W = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
_NS_S = "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
# ---------------------------------------------------------------------------
# is_extractable_document
# ---------------------------------------------------------------------------
class TestIsExtractable(unittest.TestCase):
def test_recognized_extensions(self):
self.assertTrue(is_extractable_document("a.ipynb"))
self.assertTrue(is_extractable_document("/x/B.DOCX"))
self.assertTrue(is_extractable_document("report.xlsx"))
def test_unrecognized_extensions(self):
self.assertFalse(is_extractable_document("a.py"))
self.assertFalse(is_extractable_document("a.pdf"))
self.assertFalse(is_extractable_document("a.txt"))
# ---------------------------------------------------------------------------
# Notebooks (.ipynb) — #10733
# ---------------------------------------------------------------------------
class TestNotebookExtraction(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="rex_nb_")
def tearDown(self):
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def test_markdown_and_code_in_order(self):
p = os.path.join(self.tmp, "nb.ipynb")
_write_notebook(p, [
{"cell_type": "markdown", "source": ["# Title\n", "para"]},
{"cell_type": "code", "source": "x = 1\nprint(x)",
"outputs": [{"output_type": "stream", "text": ["1\n"]}],
"execution_count": 1},
])
text = extract_document_text(p)
self.assertIn("# Title", text)
self.assertIn("print(x)", text)
# Output payloads must NOT leak into the extracted text.
self.assertNotIn("output_type", text)
self.assertNotIn("execution_count", text)
# Order preserved: markdown before code.
self.assertLess(text.index("Title"), text.index("print(x)"))
def test_string_source_form(self):
p = os.path.join(self.tmp, "nb2.ipynb")
_write_notebook(p, [{"cell_type": "code", "source": "single string source"}])
self.assertIn("single string source", extract_document_text(p))
def test_legacy_worksheets_form(self):
p = os.path.join(self.tmp, "nb3.ipynb")
nb = {"worksheets": [{"cells": [
{"cell_type": "code", "input": "ignored", "source": "legacy cell"}]}],
"nbformat": 3}
with open(p, "w") as fh:
json.dump(nb, fh)
self.assertIn("legacy cell", extract_document_text(p))
def test_malformed_notebook_raises(self):
p = os.path.join(self.tmp, "bad.ipynb")
with open(p, "w") as fh:
fh.write("{ not valid json")
with self.assertRaises(ExtractionError):
extract_document_text(p)
def test_empty_cells_raises(self):
p = os.path.join(self.tmp, "empty.ipynb")
_write_notebook(p, [])
with self.assertRaises(ExtractionError):
extract_document_text(p)
# ---------------------------------------------------------------------------
# Word documents (.docx) — #10737
# ---------------------------------------------------------------------------
class TestDocxExtraction(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="rex_docx_")
def tearDown(self):
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def _doc(self, body):
return (f'<?xml version="1.0"?><w:document xmlns:w="{_NS_W}">'
f'<w:body>{body}</w:body></w:document>')
def test_paragraphs_and_runs(self):
p = os.path.join(self.tmp, "d.docx")
_write_docx(p, self._doc(
'<w:p><w:r><w:t>Hello </w:t></w:r><w:r><w:t>World</w:t></w:r></w:p>'
'<w:p><w:r><w:t>Second</w:t></w:r></w:p>'))
text = extract_document_text(p)
self.assertIn("Hello World", text)
self.assertIn("Second", text)
def test_tabs_and_breaks(self):
p = os.path.join(self.tmp, "d2.docx")
_write_docx(p, self._doc(
'<w:p><w:r><w:t>A</w:t><w:tab/><w:t>B</w:t><w:br/><w:t>C</w:t></w:r></w:p>'))
text = extract_document_text(p)
self.assertIn("A\tB", text)
self.assertIn("C", text)
def test_not_a_zip_raises(self):
p = os.path.join(self.tmp, "bad.docx")
with open(p, "wb") as fh:
fh.write(b"plain bytes, not a zip")
with self.assertRaises(ExtractionError):
extract_document_text(p)
def test_missing_document_xml_raises(self):
p = os.path.join(self.tmp, "nodoc.docx")
with zipfile.ZipFile(p, "w") as z:
z.writestr("other.xml", "<x/>")
with self.assertRaises(ExtractionError):
extract_document_text(p)
# ---------------------------------------------------------------------------
# Excel workbooks (.xlsx) — #10740
# ---------------------------------------------------------------------------
class TestXlsxExtraction(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="rex_xlsx_")
def tearDown(self):
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def _build(self, path, *, include_hidden=True):
r = "http://schemas.openxmlformats.org/officeDocument/2006/relationships"
hidden_sheet = (f'<sheet name="Hidden" sheetId="2" state="hidden" '
f'xmlns:r="{r}" r:id="rId2"/>') if include_hidden else ""
workbook = (
f'<workbook xmlns="{_NS_S}" xmlns:r="{r}"><sheets>'
f'<sheet name="Data" sheetId="1" r:id="rId1"/>{hidden_sheet}'
f'</sheets></workbook>')
rels = (
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
'<Relationship Id="rId1" Target="worksheets/sheet1.xml" Type="x"/>'
'<Relationship Id="rId2" Target="worksheets/sheet2.xml" Type="x"/>'
'</Relationships>')
shared = (f'<sst xmlns="{_NS_S}"><si><t>Name</t></si><si><t>Score</t></si>'
f'<si><t>Alice</t></si></sst>')
sheet1 = (
f'<worksheet xmlns="{_NS_S}"><sheetData>'
'<row r="1"><c r="A1" t="s"><v>0</v></c><c r="B1" t="s"><v>1</v></c></row>'
'<row r="2"><c r="A2" t="s"><v>2</v></c><c r="B2"><v>95</v></c></row>'
'</sheetData></worksheet>')
sheet2 = (f'<worksheet xmlns="{_NS_S}"><sheetData>'
'<row r="1"><c r="A1" t="str"><v>SECRETDATA</v></c></row>'
'</sheetData></worksheet>')
_write_xlsx(path, workbook=workbook, rels=rels, shared=shared,
sheets={"xl/worksheets/sheet1.xml": sheet1,
"xl/worksheets/sheet2.xml": sheet2})
def test_visible_sheet_content(self):
p = os.path.join(self.tmp, "wb.xlsx")
self._build(p)
text = extract_document_text(p)
self.assertIn("Data", text) # sheet label
self.assertIn("Name\tScore", text) # shared-string header row
self.assertIn("Alice\t95", text) # string + numeric cells
def test_hidden_sheet_omitted(self):
p = os.path.join(self.tmp, "wb2.xlsx")
self._build(p)
text = extract_document_text(p)
self.assertNotIn("SECRETDATA", text)
self.assertNotIn("Hidden", text)
def test_not_a_zip_raises(self):
p = os.path.join(self.tmp, "bad.xlsx")
with open(p, "wb") as fh:
fh.write(b"nope")
with self.assertRaises(ExtractionError):
extract_document_text(p)
# ---------------------------------------------------------------------------
# read_file_tool integration
# ---------------------------------------------------------------------------
class TestReadFileToolIntegration(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(prefix="rex_int_")
def tearDown(self):
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def test_notebook_read_is_line_numbered(self):
p = os.path.join(self.tmp, "nb.ipynb")
_write_notebook(p, [
{"cell_type": "markdown", "source": "# H"},
{"cell_type": "code", "source": "print(1)"},
])
res = json.loads(read_file_tool(p))
self.assertTrue(res.get("extracted_document"))
self.assertIn("1|", res["content"]) # line-number gutter
self.assertIn("print(1)", res["content"])
def test_pagination(self):
p = os.path.join(self.tmp, "nb.ipynb")
_write_notebook(p, [
{"cell_type": "code", "source": "a\nb\nc\nd\ne\nf"},
])
res = json.loads(read_file_tool(p, offset=1, limit=2))
self.assertTrue(res.get("truncated"))
self.assertIn("offset=3", res.get("hint", ""))
# Only first 2 lines present.
self.assertIn("1|# ── Code cell 1 ──", res["content"])
def test_corrupt_docx_falls_through_to_binary_guard(self):
p = os.path.join(self.tmp, "bad.docx")
with open(p, "wb") as fh:
fh.write(b"not a zip")
res = json.loads(read_file_tool(p))
# Should NOT crash; falls through to the binary-extension guard.
self.assertIn("error", res)
self.assertIn("binary", res["error"].lower())
def test_docx_read_extracts(self):
p = os.path.join(self.tmp, "d.docx")
_write_docx(p, (f'<?xml version="1.0"?><w:document xmlns:w="{_NS_W}">'
'<w:body><w:p><w:r><w:t>Report body</w:t></w:r></w:p>'
'</w:body></w:document>'))
res = json.loads(read_file_tool(p))
self.assertTrue(res.get("extracted_document"))
self.assertIn("Report body", res["content"])
if __name__ == "__main__":
unittest.main()

View File

@@ -7,9 +7,15 @@ import logging
import os
import threading
from pathlib import Path
from typing import Optional
from agent.file_safety import get_read_block_error
from tools.binary_extensions import has_binary_extension
from tools.read_extract import (
ExtractionError,
extract_document_text,
is_extractable_document,
)
from tools.file_operations import (
ShellFileOperations,
normalize_read_pagination,
@@ -626,6 +632,87 @@ def clear_file_ops_cache(task_id: str = None):
_file_ops_cache.clear()
def _read_extracted_document(
resolved_str: str,
display_path: str,
offset: int,
limit: int,
task_id: str = "default",
) -> Optional[str]:
"""Render a structured document (.ipynb/.docx/.xlsx) as a paginated read.
Extracts the document to plain text, then applies the same pagination,
line-numbering, char-limit and redaction semantics as a normal text read so
the output is indistinguishable in shape from reading a source file.
Returns:
A JSON string (the tool result) on success, or ``None`` if extraction
failed — in which case the caller falls through to the normal read path
so the file stays inspectable (raw text or the binary guard).
"""
try:
text = extract_document_text(resolved_str)
except ExtractionError:
# Malformed/unsupported in practice — let the normal path handle it.
return None
except Exception:
logger.debug("document extraction failed for %s", display_path, exc_info=True)
return None
lines = text.split("\n")
# text ends with a trailing newline; drop the resulting empty final element
# so total_lines reflects real content lines (matches sed/wc behavior).
if lines and lines[-1] == "":
lines.pop()
total_lines = len(lines)
start_idx = offset - 1 # offset is 1-indexed
end_idx = start_idx + limit
page = lines[start_idx:end_idx]
page_text = "\n".join(page)
truncated = total_lines > (start_idx + limit)
end_line = start_idx + limit
hint = None
if truncated:
hint = (
f"Use offset={end_line + 1} to continue reading "
f"(showing {offset}-{min(end_line, total_lines)} of {total_lines} lines)"
)
# Line-number the page using the shared formatter so output matches a
# normal read exactly (LINE_NUM|CONTENT, long-line truncation, etc.).
file_ops = _get_file_ops(task_id)
numbered = file_ops._add_line_numbers(page_text, offset) if page_text else ""
# Char-count guard — same safety limit as the normal read path.
max_chars = _get_max_read_chars()
if len(numbered) > max_chars:
return json.dumps({
"error": (
f"Read produced {len(numbered):,} characters which exceeds "
f"the safety limit ({max_chars:,} chars). "
"Use offset and limit to read a smaller range. "
f"The document has {total_lines} lines of extracted text."
),
"path": display_path,
"total_lines": total_lines,
}, ensure_ascii=False)
numbered = redact_sensitive_text(numbered, code_file=True)
result_dict = {
"content": numbered,
"total_lines": total_lines,
"truncated": truncated,
"extracted_document": True,
}
if hint:
result_dict["hint"] = hint
return json.dumps(result_dict, ensure_ascii=False)
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
"""Read a file with pagination and line numbers."""
try:
@@ -644,6 +731,23 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
_resolved = _resolve_path_for_task(path, task_id)
# ── Structured-document extraction ────────────────────────────
# .ipynb / .docx / .xlsx render to plain text in-process so the
# agent can read their content directly. Raw .ipynb JSON drowns
# the model in metadata + output payloads, and .docx/.xlsx are
# otherwise rejected as binary. Extracted text flows through the
# same pagination / line-numbering / char-limit / redaction
# pipeline as a normal text read. Malformed documents fall back to
# the normal read path (raw text / binary guard) so they stay
# inspectable. Ported from Kilo-Org/kilocode #10733, #10737, #10740.
if is_extractable_document(str(_resolved)):
_doc_result = _read_extracted_document(
str(_resolved), path, offset, limit, task_id
)
if _doc_result is not None:
return _doc_result
# else: extraction failed → fall through to normal read path.
# ── Binary file guard ─────────────────────────────────────────
# Block binary files by extension (no I/O).
if has_binary_extension(str(_resolved)):
@@ -1311,7 +1415,7 @@ def _check_file_reqs():
READ_FILE_SCHEMA = {
"name": "read_file",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. Jupyter notebooks (.ipynb), Word documents (.docx), and Excel workbooks (.xlsx) are auto-extracted to readable text. NOTE: Cannot read images or other binary files — use vision_analyze for images.",
"parameters": {
"type": "object",
"properties": {

409
tools/read_extract.py Normal file
View File

@@ -0,0 +1,409 @@
"""Document text extraction for the read_file tool.
Ported/adapted from Kilo-Org/kilocode PRs #10733 (notebooks), #10737 (DOCX),
and #10740 (XLSX), which added structured-document reading to their CLI `read`
tool. Kilo bundled the `mammoth` JS library for DOCX; hermes-agent instead uses
a pure-stdlib approach (``json`` + ``zipfile`` + ``xml.etree``) so no new Python
dependency is added — ``.docx`` and ``.xlsx`` are both Zip+OOXML containers that
stdlib can unpack and parse.
The router (:func:`extract_document_text`) returns a plain-text rendering of the
document. The caller (``read_file_tool``) then routes that text through the
existing line-numbering, pagination, truncation, char-limit and redaction
pipeline — exactly as it does for a normal text file. That keeps a single set of
output semantics for every readable format.
Design constraints (from the hermes-agent-dev skill):
* No new hard dependency. Everything here is stdlib.
* Extraction reads local bytes directly (works regardless of terminal
backend, since the file is resolved to a host path before we get here).
* Malformed inputs degrade gracefully: callers fall back to raw-text reading
so the file stays inspectable rather than throwing an opaque error.
"""
from __future__ import annotations
import json
import zipfile
from typing import Optional
from xml.etree import ElementTree as ET
__all__ = [
"EXTRACTABLE_EXTENSIONS",
"is_extractable_document",
"extract_document_text",
"ExtractionError",
]
# Extensions we can render to text in-process. Lowercase, leading dot.
EXTRACTABLE_EXTENSIONS = frozenset({".ipynb", ".docx", ".xlsx"})
# Workbook hard cap mirrors Kilo #10740 (reject >50 MB before parsing). Applied
# by the caller via file size; re-stated here as the documented contract.
MAX_XLSX_BYTES = 50 * 1024 * 1024
# Bound worksheet extraction so a pathological workbook can't blow up context
# before the read tool's own char-limit guard runs. Generous — the read tool
# truncates afterward anyway.
_MAX_XLSX_ROWS_PER_SHEET = 5000
_MAX_XLSX_COLS = 256
# OOXML namespaces.
_NS_W = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
_NS_S = "http://schemas.openxmlformats.org/spreadsheetml/2006/main"
class ExtractionError(Exception):
"""Raised when a document can't be extracted to text.
The caller treats this as a signal to fall back to raw-text reading so the
file remains inspectable (matching Kilo's malformed-notebook behavior).
"""
def is_extractable_document(path: str) -> bool:
"""True if ``path`` has an extension we can render to text."""
lower = path.lower()
return any(lower.endswith(ext) for ext in EXTRACTABLE_EXTENSIONS)
def _ext_of(path: str) -> str:
lower = path.lower()
for ext in EXTRACTABLE_EXTENSIONS:
if lower.endswith(ext):
return ext
return ""
def extract_document_text(path: str) -> str:
"""Render a supported document to plain text.
Args:
path: Local filesystem path to a ``.ipynb`` / ``.docx`` / ``.xlsx`` file.
Returns:
Plain-text rendering suitable for line-numbered display.
Raises:
ExtractionError: if the file is malformed or can't be parsed. The caller
should fall back to raw-text reading.
"""
ext = _ext_of(path)
if ext == ".ipynb":
return _extract_notebook(path)
if ext == ".docx":
return _extract_docx(path)
if ext == ".xlsx":
return _extract_xlsx(path)
raise ExtractionError(f"Unsupported document type: {ext or path!r}")
# ──────────────────────────────────────────────────────────────────────────
# Jupyter notebooks (.ipynb) — Kilo #10733
# ──────────────────────────────────────────────────────────────────────────
def _extract_notebook(path: str) -> str:
"""Extract markdown + code cell sources in document order.
Raw ``.ipynb`` JSON drowns the model in metadata and output payloads
(base64 images, execution counts, stream noise). We keep only the cell
sources, labelled by type, so the agent sees the actual document.
"""
try:
with open(path, "r", encoding="utf-8", errors="replace") as fh:
nb = json.load(fh)
except (json.JSONDecodeError, OSError, ValueError) as exc:
raise ExtractionError(f"Not a valid notebook: {exc}") from exc
if not isinstance(nb, dict):
raise ExtractionError("Notebook root is not a JSON object")
cells = nb.get("cells")
if not isinstance(cells, list):
# nbformat < 4 stored cells under worksheets[].cells.
worksheets = nb.get("worksheets")
if isinstance(worksheets, list) and worksheets:
cells = []
for ws in worksheets:
if isinstance(ws, dict) and isinstance(ws.get("cells"), list):
cells.extend(ws["cells"])
else:
raise ExtractionError("Notebook has no cells array")
parts: list[str] = []
code_n = 0
md_n = 0
for idx, cell in enumerate(cells):
if not isinstance(cell, dict):
continue
cell_type = cell.get("cell_type", "")
source = _join_source(cell.get("source", ""))
if cell_type == "markdown":
md_n += 1
parts.append(f"# ── Markdown cell {md_n} ──")
parts.append(source.rstrip("\n"))
parts.append("")
elif cell_type == "code":
code_n += 1
parts.append(f"# ── Code cell {code_n} ──")
parts.append(source.rstrip("\n"))
parts.append("")
elif cell_type == "raw":
parts.append("# ── Raw cell ──")
parts.append(source.rstrip("\n"))
parts.append("")
# Unknown cell types are skipped silently.
if not parts:
raise ExtractionError("Notebook contains no readable cells")
text = "\n".join(parts).rstrip("\n") + "\n"
return text
def _join_source(source) -> str:
"""Notebook ``source`` is either a string or a list of line strings."""
if isinstance(source, list):
return "".join(s for s in source if isinstance(s, str))
if isinstance(source, str):
return source
return ""
# ──────────────────────────────────────────────────────────────────────────
# Word documents (.docx) — Kilo #10737 (stdlib instead of mammoth)
# ──────────────────────────────────────────────────────────────────────────
def _extract_docx(path: str) -> str:
"""Extract paragraph text from a DOCX in document order.
A ``.docx`` is a Zip container; the body text lives in
``word/document.xml`` as ``<w:p>`` paragraphs containing ``<w:t>`` text
runs. We walk paragraphs in order, join their runs, and emit one line per
paragraph. ``<w:tab>`` becomes a tab and ``<w:br>``/``<w:cr>`` become
newlines so basic layout survives.
"""
try:
with zipfile.ZipFile(path) as zf:
try:
xml_bytes = zf.read("word/document.xml")
except KeyError as exc:
raise ExtractionError("DOCX missing word/document.xml") from exc
except (zipfile.BadZipFile, OSError) as exc:
raise ExtractionError(f"Not a valid DOCX (zip) file: {exc}") from exc
try:
root = ET.fromstring(xml_bytes)
except ET.ParseError as exc:
raise ExtractionError(f"DOCX document.xml is malformed: {exc}") from exc
w = "{%s}" % _NS_W
lines: list[str] = []
# Iterate paragraphs in document order. Nested paragraphs (e.g. inside
# tables) are flattened, which is acceptable for text extraction.
for para in root.iter(f"{w}p"):
buf: list[str] = []
for node in para.iter():
tag = node.tag
if tag == f"{w}t":
buf.append(node.text or "")
elif tag == f"{w}tab":
buf.append("\t")
elif tag in (f"{w}br", f"{w}cr"):
buf.append("\n")
para_text = "".join(buf)
# A paragraph may itself contain explicit line breaks.
lines.extend(para_text.split("\n"))
if not any(line.strip() for line in lines):
raise ExtractionError("DOCX contains no extractable text")
return "\n".join(lines).rstrip("\n") + "\n"
# ──────────────────────────────────────────────────────────────────────────
# Excel workbooks (.xlsx) — Kilo #10740 (stdlib instead of a parser lib)
# ──────────────────────────────────────────────────────────────────────────
def _extract_xlsx(path: str) -> str:
"""Extract visible worksheets as labelled tab-separated text.
An ``.xlsx`` is a Zip of OOXML parts:
* ``xl/workbook.xml`` — sheet names + visibility + rId mapping
* ``xl/_rels/workbook.xml.rels`` — rId → worksheet part path
* ``xl/sharedStrings.xml`` — interned string table
* ``xl/worksheets/sheetN.xml``— cell data (values reference shared strings)
Hidden sheets are omitted. Cells are rendered as their formatted value;
string cells dereference the shared-string table. Rows/cols are bounded.
"""
try:
zf = zipfile.ZipFile(path)
except (zipfile.BadZipFile, OSError) as exc:
raise ExtractionError(f"Not a valid XLSX (zip) file: {exc}") from exc
with zf:
names = set(zf.namelist())
if "xl/workbook.xml" not in names:
raise ExtractionError("XLSX missing xl/workbook.xml")
shared = _read_shared_strings(zf, names)
sheets = _read_workbook_sheets(zf)
rels = _read_workbook_rels(zf, names)
out: list[str] = []
for sheet in sheets:
if sheet["state"] in ("hidden", "veryHidden"):
continue
target = rels.get(sheet["rid"])
if not target:
# Fallback: positional guess (xl/worksheets/sheetN.xml).
continue
part = _normalize_sheet_target(target)
if part not in names:
continue
try:
rows = _read_sheet_rows(zf.read(part), shared)
except ET.ParseError:
continue
out.append(f"# ── Sheet: {sheet['name']} ──")
if rows:
out.extend("\t".join(r) for r in rows)
else:
out.append("(empty)")
out.append("")
if not out:
raise ExtractionError("XLSX has no visible sheets with content")
return "\n".join(out).rstrip("\n") + "\n"
def _read_shared_strings(zf: zipfile.ZipFile, names: set) -> list:
if "xl/sharedStrings.xml" not in names:
return []
try:
root = ET.fromstring(zf.read("xl/sharedStrings.xml"))
except ET.ParseError:
return []
s = "{%s}" % _NS_S
result: list[str] = []
for si in root.iter(f"{s}si"):
# A shared string item is either a single <t> or rich-text runs <r><t>.
texts = [t.text or "" for t in si.iter(f"{s}t")]
result.append("".join(texts))
return result
def _read_workbook_sheets(zf: zipfile.ZipFile) -> list:
root = ET.fromstring(zf.read("xl/workbook.xml"))
s = "{%s}" % _NS_S
r = "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}"
sheets = []
for sheet in root.iter(f"{s}sheet"):
sheets.append({
"name": sheet.get("name", "Sheet"),
"state": sheet.get("state", "visible"),
"rid": sheet.get(f"{r}id", ""),
})
return sheets
def _read_workbook_rels(zf: zipfile.ZipFile, names: set) -> dict:
rels_path = "xl/_rels/workbook.xml.rels"
if rels_path not in names:
return {}
try:
root = ET.fromstring(zf.read(rels_path))
except ET.ParseError:
return {}
pr = "{http://schemas.openxmlformats.org/package/2006/relationships}"
mapping = {}
for rel in root.iter(f"{pr}Relationship"):
rid = rel.get("Id", "")
target = rel.get("Target", "")
if rid and target:
mapping[rid] = target
return mapping
def _normalize_sheet_target(target: str) -> str:
"""Workbook rels target is relative to ``xl/`` (e.g. ``worksheets/sheet1.xml``)."""
target = target.lstrip("/")
if target.startswith("xl/"):
return target
return "xl/" + target
def _col_index(cell_ref: str) -> int:
"""Convert an A1-style cell ref's column letters to a 0-based index."""
letters = "".join(ch for ch in cell_ref if ch.isalpha())
idx = 0
for ch in letters:
idx = idx * 26 + (ord(ch.upper()) - ord("A") + 1)
return idx - 1 if idx > 0 else 0
def _read_sheet_rows(xml_bytes: bytes, shared: list) -> list:
root = ET.fromstring(xml_bytes)
s = "{%s}" % _NS_S
rows_out: list[list[str]] = []
row_count = 0
for row in root.iter(f"{s}row"):
if row_count >= _MAX_XLSX_ROWS_PER_SHEET:
break
row_count += 1
cells: dict[int, str] = {}
max_col = -1
for c in row.iter(f"{s}c"):
ref = c.get("r", "")
col = _col_index(ref) if ref else (max_col + 1)
if col >= _MAX_XLSX_COLS:
continue
value = _cell_value(c, shared, s)
cells[col] = value
if col > max_col:
max_col = col
if max_col < 0:
rows_out.append([])
continue
rows_out.append([cells.get(i, "") for i in range(max_col + 1)])
# Trim trailing fully-empty rows.
while rows_out and not any(cell.strip() for cell in rows_out[-1]):
rows_out.pop()
return rows_out
def _cell_value(c, shared: list, s: str) -> str:
cell_type = c.get("t", "")
v = c.find(f"{s}v")
if cell_type == "s":
# Shared-string index.
if v is not None and v.text is not None:
try:
idx = int(v.text)
if 0 <= idx < len(shared):
return shared[idx]
except ValueError:
pass
return ""
if cell_type == "inlineStr":
is_node = c.find(f"{s}is")
if is_node is not None:
return "".join(t.text or "" for t in is_node.iter(f"{s}t"))
return ""
if cell_type == "str":
# Formula result string.
return v.text if (v is not None and v.text is not None) else ""
if cell_type == "b":
if v is not None and v.text is not None:
return "TRUE" if v.text.strip() in ("1", "true", "TRUE") else "FALSE"
return ""
if cell_type == "e":
# Error value (e.g. #DIV/0!).
return v.text if (v is not None and v.text is not None) else "#ERROR"
# Numeric / general.
if v is not None and v.text is not None:
return v.text
return ""