Files
lingma-openai-gateway/tests/test_schema_normalization.py
GitHub Actions 12a4d9584e feat: harden cache reuse semantics and expand protocol regressions
Stabilize cross-protocol ask-mode/streaming behavior and reduce session-reuse branch collisions, then add focused docs/tests for multimodal normalization and pool/stats/config paths.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 14:26:11 +08:00

75 lines
2.6 KiB
Python

from __future__ import annotations
import unittest
from app.anthropic_schema import (
AnthropicMessagesRequest,
affinity_key_for_anthropic,
anthropic_to_internal_messages,
flatten_anthropic_content,
)
from app.openai_schema import flatten_content
class SchemaNormalizationTests(unittest.TestCase):
def test_openai_flatten_content_with_multimodal_parts(self) -> None:
out = flatten_content(
[
{"type": "text", "text": "hello"},
{"type": "image_url", "image_url": {"url": "x"}},
{"type": "input_image", "image_url": {"url": "y"}},
{"type": "input_audio", "input_audio": {"data": "x"}},
{"type": "text", "text": "world"},
]
)
self.assertEqual(out, "hello\n[image]\n[image]\n[audio]\nworld")
def test_anthropic_flatten_content_with_tool_blocks(self) -> None:
out = flatten_anthropic_content(
[
{"type": "text", "text": "before"},
{"type": "tool_use", "name": "search", "input": {"q": "hi"}},
{"type": "tool_result", "content": "ok"},
]
)
self.assertIn("before", out)
self.assertIn("[tool_use]", out)
self.assertIn("[tool_result] ok", out)
def test_anthropic_to_internal_messages_maps_system_and_messages(self) -> None:
req = AnthropicMessagesRequest(
model="org_auto",
max_tokens=64,
system="sys",
messages=[
{"role": "user", "content": "u1"},
{"role": "assistant", "content": "a1"},
],
)
out = anthropic_to_internal_messages(req)
self.assertEqual(out[0], {"role": "system", "content": "sys"})
self.assertEqual(out[1], {"role": "user", "content": "u1"})
self.assertEqual(out[2], {"role": "assistant", "content": "a1"})
def test_affinity_key_for_anthropic_priority(self) -> None:
req_user = AnthropicMessagesRequest(
model="org_auto",
max_tokens=64,
metadata={"user_id": "u-1"},
messages=[{"role": "user", "content": "hello"}],
)
self.assertEqual(affinity_key_for_anthropic(req_user), "u-1")
req_fallback = AnthropicMessagesRequest(
model="org_auto",
max_tokens=64,
messages=[{"role": "user", "content": "hello"}],
)
key = affinity_key_for_anthropic(req_fallback)
self.assertIsInstance(key, str)
self.assertTrue(key.startswith("first:"))
if __name__ == "__main__":
unittest.main()