Compare commits

...

1 Commits

Author SHA1 Message Date
Teknium
c558f9a1d8 Port from Kilo-Org/kilocode#9434: strip historical media after compression
After context compression, the protected tail messages retain their
original image parts. When those include multi-MB pasted screenshots,
every subsequent API request re-ships the same base-64 blobs forever —
which can push the request past provider body-size limits and wedge the
session even though compression 'succeeded'.

Add _strip_historical_media() to agent/context_compressor.py. After the
summary is built, find the newest user message that carries an image
part and replace image parts in every earlier message with a short
text placeholder ('[Attached image — stripped after compression]').
The newest image-bearing user turn keeps its media so the model can
still analyse what the user just sent.

Handles all three multimodal shapes:
  - OpenAI chat.completions image_url
  - OpenAI Responses API input_image
  - Anthropic native {type: image, source: ...}

Includes 27 unit tests covering the helpers and the end-to-end
compress() integration, plus a manual E2E check confirming a ~4MB
two-image conversation shrinks to ~2MB after compression.
2026-05-04 17:06:12 -07:00
2 changed files with 382 additions and 0 deletions

View File

@@ -194,6 +194,114 @@ def _truncate_tool_call_args_json(args: str, head_chars: int = 200) -> str:
return json.dumps(shrunken, ensure_ascii=False)
_IMAGE_PART_TYPES = frozenset({"image_url", "input_image", "image"})
def _is_image_part(part: Any) -> bool:
"""True if ``part`` is a multimodal image content block.
Recognizes all three shapes the agent handles:
- OpenAI chat.completions: ``{"type": "image_url", "image_url": ...}``
- OpenAI Responses API: ``{"type": "input_image", "image_url": "..."}``
- Anthropic native: ``{"type": "image", "source": {...}}``
"""
if not isinstance(part, dict):
return False
return part.get("type") in _IMAGE_PART_TYPES
def _content_has_images(content: Any) -> bool:
"""True if a message's ``content`` is a multimodal list with image parts."""
if not isinstance(content, list):
return False
return any(_is_image_part(p) for p in content)
def _strip_images_from_content(content: Any) -> Any:
"""Return a copy of ``content`` with every image part replaced by a
short text placeholder.
- String content is returned unchanged.
- Non-list, non-string content is returned unchanged.
- List content: image parts become ``{"type": "text", "text": "[Attached
image — stripped after compression]"}``; other parts are preserved as-is.
Input is never mutated.
"""
if not isinstance(content, list):
return content
if not any(_is_image_part(p) for p in content):
return content
new_parts: List[Any] = []
for p in content:
if _is_image_part(p):
new_parts.append({
"type": "text",
"text": "[Attached image — stripped after compression]",
})
else:
new_parts.append(p)
return new_parts
def _strip_historical_media(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Replace image parts in older messages with placeholder text.
The anchor is the *last* user message that has any image content. Every
message before that anchor gets its image parts replaced with a short
placeholder so the outgoing request stops re-shipping the same multi-MB
base-64 image blobs on every turn.
If no user message carries images, the list is returned unchanged.
If the only user message with images is the very first one (nothing
earlier to strip), the list is returned unchanged.
Shallow copies of touched messages only; input is never mutated.
Port of Kilo-Org/kilocode#9434 (adapted for the OpenAI-style message
shape the hermes compressor emits).
"""
if not messages:
return messages
# Find the newest user message that carries at least one image part.
# We anchor on image-bearing user messages (not all user messages) so
# a plain text follow-up after a big-image turn still strips the old
# image — matching the problem kilocode#9434 set out to solve.
anchor = -1
for i in range(len(messages) - 1, -1, -1):
msg = messages[i]
if not isinstance(msg, dict):
continue
if msg.get("role") != "user":
continue
if _content_has_images(msg.get("content")):
anchor = i
break
if anchor <= 0:
# No image-bearing user message, or it's the very first message —
# nothing before it to strip.
return messages
changed = False
result: List[Dict[str, Any]] = []
for i, msg in enumerate(messages):
if i >= anchor or not isinstance(msg, dict):
result.append(msg)
continue
content = msg.get("content")
if not _content_has_images(content):
result.append(msg)
continue
new_msg = msg.copy()
new_msg["content"] = _strip_images_from_content(content)
result.append(new_msg)
changed = True
return result if changed else messages
def _summarize_tool_result(tool_name: str, tool_args: str, tool_content: str) -> str:
"""Create an informative 1-line summary of a tool call + result.
@@ -1408,6 +1516,14 @@ The user has requested that this compaction PRIORITISE preserving all informatio
compressed = self._sanitize_tool_pairs(compressed)
# Replace image parts in all compressed messages before the newest
# image-bearing user turn with a short text placeholder. Without
# this, tail messages keep their original multi-MB base-64 image
# payloads forever, which can push every subsequent API request
# past the provider's body-size limit and wedge the session.
# Port of Kilo-Org/kilocode#9434.
compressed = _strip_historical_media(compressed)
new_estimate = estimate_messages_tokens_rough(compressed)
saved_estimate = display_tokens - new_estimate

View File

@@ -0,0 +1,266 @@
"""Tests for post-compression historical-media stripping.
Port of Kilo-Org/kilocode#9434 (adapted for OpenAI-style message lists).
Without this pass, tail messages keep their original multi-MB base-64 image
payloads after context compression, and every subsequent request re-ships
them — sometimes breaching provider body-size limits and wedging the
session.
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
from agent.context_compressor import (
ContextCompressor,
_content_has_images,
_is_image_part,
_strip_historical_media,
_strip_images_from_content,
)
IMG_URL = {
"type": "image_url",
"image_url": {"url": "data:image/png;base64," + ("A" * 1024)},
}
INPUT_IMG = {
"type": "input_image",
"image_url": "data:image/png;base64," + ("B" * 1024),
}
ANTHROPIC_IMG = {
"type": "image",
"source": {"type": "base64", "media_type": "image/png", "data": "C" * 1024},
}
TEXT = {"type": "text", "text": "hi"}
INPUT_TEXT = {"type": "input_text", "text": "hi"}
class TestIsImagePart:
def test_openai_chat_shape(self):
assert _is_image_part(IMG_URL) is True
def test_openai_responses_shape(self):
assert _is_image_part(INPUT_IMG) is True
def test_anthropic_native_shape(self):
assert _is_image_part(ANTHROPIC_IMG) is True
def test_text_part_is_not_image(self):
assert _is_image_part(TEXT) is False
assert _is_image_part(INPUT_TEXT) is False
def test_non_dict_rejected(self):
assert _is_image_part("image") is False
assert _is_image_part(None) is False
assert _is_image_part(42) is False
class TestContentHasImages:
def test_string_content(self):
assert _content_has_images("a string") is False
def test_empty_list(self):
assert _content_has_images([]) is False
def test_text_only_list(self):
assert _content_has_images([TEXT, TEXT]) is False
def test_list_with_image(self):
assert _content_has_images([TEXT, IMG_URL]) is True
def test_none(self):
assert _content_has_images(None) is False
class TestStripImagesFromContent:
def test_string_passthrough(self):
assert _strip_images_from_content("hello") == "hello"
def test_none_passthrough(self):
assert _strip_images_from_content(None) is None
def test_text_only_passthrough(self):
parts = [TEXT, {"type": "text", "text": "world"}]
assert _strip_images_from_content(parts) == parts
def test_replaces_image_with_placeholder(self):
parts = [TEXT, IMG_URL]
out = _strip_images_from_content(parts)
assert len(out) == 2
assert out[0] == TEXT
assert out[1] == {
"type": "text",
"text": "[Attached image — stripped after compression]",
}
def test_does_not_mutate_input(self):
parts = [IMG_URL, TEXT]
_ = _strip_images_from_content(parts)
assert parts[0] is IMG_URL # original list untouched
assert parts[1] is TEXT
def test_handles_all_three_shapes(self):
parts = [IMG_URL, INPUT_IMG, ANTHROPIC_IMG, TEXT]
out = _strip_images_from_content(parts)
assert sum(1 for p in out if p.get("type") == "text") == 4
assert not any(_is_image_part(p) for p in out)
class TestStripHistoricalMedia:
def test_empty_passthrough(self):
assert _strip_historical_media([]) == []
def test_no_images_anywhere(self):
msgs = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hey"},
{"role": "user", "content": "bye"},
]
assert _strip_historical_media(msgs) is msgs # identity — no copy
def test_single_image_user_only_first_message(self):
# Only image-bearing user is the first message — nothing before it.
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]},
{"role": "assistant", "content": "ok"},
]
out = _strip_historical_media(msgs)
assert out is msgs # no-op
# Image still there.
assert _content_has_images(out[0]["content"])
def test_strips_older_user_image_keeps_newest(self):
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]}, # old — strip
{"role": "assistant", "content": "looked at it"},
{"role": "user", "content": [TEXT, INPUT_IMG]}, # newest — keep
]
out = _strip_historical_media(msgs)
assert out is not msgs # new list
# First message's image was replaced
assert not _content_has_images(out[0]["content"])
# Newest user still has its image
assert _content_has_images(out[2]["content"])
def test_strips_assistant_and_tool_images_before_anchor(self):
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]}, # old user
{"role": "assistant", "content": [TEXT, IMG_URL]}, # old assistant
{"role": "tool", "content": [TEXT, IMG_URL], "tool_call_id": "t1"},
{"role": "user", "content": [TEXT, IMG_URL]}, # newest user — keep
]
out = _strip_historical_media(msgs)
for i in range(3):
assert not _content_has_images(out[i]["content"]), f"msg {i} still has image"
assert _content_has_images(out[3]["content"])
def test_text_only_newest_user_still_strips_older_images(self):
# The anchor is "newest user WITH images". If the newest user is
# text-only, we fall back to the previous image-bearing user turn.
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]},
{"role": "assistant", "content": "ok"},
{"role": "user", "content": [TEXT, IMG_URL]}, # anchor
{"role": "assistant", "content": "done"},
{"role": "user", "content": "follow-up text only"},
]
out = _strip_historical_media(msgs)
# First image-bearing user (index 0) was stripped — it was before the
# newest image-bearing user (index 2).
assert not _content_has_images(out[0]["content"])
# Anchor (index 2) keeps its image.
assert _content_has_images(out[2]["content"])
def test_no_image_bearing_user_is_noop(self):
msgs = [
{"role": "user", "content": "first"},
{"role": "assistant", "content": [TEXT, IMG_URL]}, # assistant image only
{"role": "user", "content": "second"},
]
out = _strip_historical_media(msgs)
# No image-bearing user anchor → no stripping.
assert out is msgs
assert _content_has_images(out[1]["content"])
def test_does_not_mutate_input_messages(self):
msg0 = {"role": "user", "content": [TEXT, IMG_URL]}
msg1 = {"role": "user", "content": [TEXT, IMG_URL]}
msgs = [msg0, msg1]
_ = _strip_historical_media(msgs)
# Originals untouched
assert _content_has_images(msg0["content"])
assert _content_has_images(msg1["content"])
def test_idempotent(self):
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]},
{"role": "assistant", "content": "k"},
{"role": "user", "content": [TEXT, IMG_URL]},
]
first = _strip_historical_media(msgs)
second = _strip_historical_media(first)
# Second pass is a no-op — no images left before the anchor.
assert second is first
def test_non_dict_messages_pass_through(self):
msgs = [
"not-a-dict", # shouldn't crash
{"role": "user", "content": [TEXT, IMG_URL]},
{"role": "assistant", "content": "ok"},
{"role": "user", "content": [TEXT, IMG_URL]},
]
out = _strip_historical_media(msgs)
assert out[0] == "not-a-dict"
# Image-bearing user at index 1 is before the anchor (index 3) → stripped.
assert not _content_has_images(out[1]["content"])
class TestCompressIntegration:
"""Verify the stripping runs inside ContextCompressor.compress()."""
@pytest.fixture
def compressor(self):
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(
model="test/model",
threshold_percent=0.50,
protect_first_n=1,
protect_last_n=2,
quiet_mode=True,
)
return c
def test_compress_strips_historical_images(self, compressor):
# Enough messages to trigger the summarize path. protect_first_n=1 +
# protect_last_n=2 + a middle window of at least 3 with a summary.
msgs = [
{"role": "system", "content": "sys"},
{"role": "user", "content": [TEXT, IMG_URL]}, # old image-bearing user
{"role": "assistant", "content": "looked at it"},
{"role": "user", "content": "follow-up"},
{"role": "assistant", "content": "ack"},
{"role": "user", "content": "more"},
{"role": "assistant", "content": "ok"},
{"role": "user", "content": [TEXT, IMG_URL]}, # newest image-bearing user (tail)
{"role": "assistant", "content": "done"},
]
# Bypass the real LLM summary — return a stub so compress() proceeds.
with patch.object(compressor, "_generate_summary", return_value="SUMMARY TEXT"):
out = compressor.compress(msgs, current_tokens=60_000)
# Newest user turn with image should still have it (it's in the tail).
user_imgs = [m for m in out if m.get("role") == "user" and _content_has_images(m.get("content"))]
assert len(user_imgs) == 1, (
"Expected exactly one user message with images after compression "
f"(the newest one); got {len(user_imgs)}"
)
# No assistant or tool messages should carry images either.
for m in out:
if m is user_imgs[0]:
continue
assert not _content_has_images(m.get("content")), (
f"Stale image in {m.get('role')!r} message after compression"
)