Deduplicate allowlist filtering and forced-tool fallback parsing across the OpenAI and Anthropic non-stream bridge paths while preserving existing wire behavior. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
311 lines
9.2 KiB
Python
311 lines
9.2 KiB
Python
from __future__ import annotations
|
|
|
|
import ast
|
|
import json
|
|
import uuid
|
|
from typing import Any
|
|
|
|
|
|
def _json_string(value: Any) -> str:
|
|
if isinstance(value, str):
|
|
return value
|
|
try:
|
|
return json.dumps(value if value is not None else {}, ensure_ascii=False)
|
|
except Exception:
|
|
return "{}"
|
|
|
|
|
|
def _openai_tool_name(tool: Any) -> str | None:
|
|
if not isinstance(tool, dict):
|
|
return None
|
|
if tool.get("type") == "function":
|
|
fn = tool.get("function")
|
|
if isinstance(fn, dict):
|
|
name = fn.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
name = tool.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
return None
|
|
|
|
|
|
def _anthropic_tool_name(tool: Any) -> str | None:
|
|
if not isinstance(tool, dict):
|
|
return None
|
|
name = tool.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
fn = tool.get("function")
|
|
if isinstance(fn, dict):
|
|
nested_name = fn.get("name")
|
|
if isinstance(nested_name, str) and nested_name.strip():
|
|
return nested_name.strip()
|
|
return None
|
|
|
|
|
|
def _tool_event_allowed(
|
|
tool_name: str,
|
|
tool_config: dict[str, Any] | None,
|
|
*,
|
|
forced_tool_name: str | None = None,
|
|
) -> bool:
|
|
if not (tool_config and isinstance(tool_config.get("tools"), list) and tool_config.get("tools")):
|
|
return True
|
|
for tool in tool_config.get("tools") or []:
|
|
if tool_name == _anthropic_tool_name(tool) or tool_name == _openai_tool_name(tool):
|
|
return True
|
|
return bool(forced_tool_name and tool_name == forced_tool_name)
|
|
|
|
|
|
def _allowed_tool_event(
|
|
tool: Any,
|
|
*,
|
|
tool_config: dict[str, Any] | None,
|
|
forced_tool_name: str | None = None,
|
|
) -> dict[str, Any] | None:
|
|
if not isinstance(tool, dict):
|
|
return None
|
|
tool_name = str(tool.get("name") or "")
|
|
if not _tool_event_allowed(tool_name, tool_config, forced_tool_name=forced_tool_name):
|
|
return None
|
|
return tool
|
|
|
|
|
|
def _allowed_tool_events(
|
|
tool_events: Any,
|
|
*,
|
|
tool_config: dict[str, Any] | None,
|
|
forced_tool_name: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
if not isinstance(tool_events, list):
|
|
return []
|
|
out: list[dict[str, Any]] = []
|
|
for item in tool_events:
|
|
allowed = _allowed_tool_event(
|
|
item,
|
|
tool_config=tool_config,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
if allowed is not None:
|
|
out.append(allowed)
|
|
return out
|
|
|
|
|
|
def _openai_forced_tool_name(tool_choice: Any) -> str | None:
|
|
if not isinstance(tool_choice, dict):
|
|
return None
|
|
fn = tool_choice.get("function")
|
|
if isinstance(fn, dict):
|
|
name = fn.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
return None
|
|
|
|
|
|
def _anthropic_forced_tool_name(tool_choice: Any) -> str | None:
|
|
if not isinstance(tool_choice, dict):
|
|
return None
|
|
if tool_choice.get("type") == "tool":
|
|
name = tool_choice.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
fn = tool_choice.get("function")
|
|
if isinstance(fn, dict):
|
|
name = fn.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
return None
|
|
|
|
|
|
def _json_object_from_text(text: str) -> dict[str, Any] | None:
|
|
raw = text.strip()
|
|
if not raw:
|
|
return None
|
|
if raw.startswith("```") and raw.endswith("```"):
|
|
raw = raw[3:-3].strip()
|
|
if raw.lower().startswith("json"):
|
|
raw = raw[4:].strip()
|
|
try:
|
|
parsed = json.loads(raw)
|
|
except Exception:
|
|
return None
|
|
return parsed if isinstance(parsed, dict) else None
|
|
|
|
|
|
def _tool_code_single_arg_name(tools: list[dict[str, Any]] | None, forced_tool_name: str) -> str | None:
|
|
if not isinstance(tools, list):
|
|
return None
|
|
for tool in tools:
|
|
if not isinstance(tool, dict):
|
|
continue
|
|
schema: dict[str, Any] | None = None
|
|
if tool.get("type") == "function":
|
|
fn = tool.get("function")
|
|
if isinstance(fn, dict) and fn.get("name") == forced_tool_name:
|
|
params = fn.get("parameters")
|
|
if isinstance(params, dict):
|
|
schema = params
|
|
elif tool.get("name") == forced_tool_name:
|
|
input_schema = tool.get("input_schema")
|
|
if isinstance(input_schema, dict):
|
|
schema = input_schema
|
|
if not isinstance(schema, dict):
|
|
continue
|
|
properties = schema.get("properties")
|
|
if not isinstance(properties, dict) or len(properties) != 1:
|
|
return None
|
|
only_name = next(iter(properties.keys()), None)
|
|
if isinstance(only_name, str) and only_name.strip():
|
|
return only_name
|
|
return None
|
|
return None
|
|
|
|
|
|
def _tool_code_object_from_text(
|
|
text: str,
|
|
forced_tool_name: str,
|
|
*,
|
|
single_arg_name: str | None = None,
|
|
) -> dict[str, Any] | None:
|
|
raw = text.strip()
|
|
if not raw.startswith("```tool_code") or not raw.endswith("```"):
|
|
return None
|
|
lines = raw.splitlines()
|
|
if len(lines) < 2:
|
|
return None
|
|
body = "\n".join(lines[1:-1]).strip()
|
|
try:
|
|
parsed = ast.parse(body, mode="eval")
|
|
except Exception:
|
|
return None
|
|
call = parsed.body
|
|
if not isinstance(call, ast.Call):
|
|
return None
|
|
if not isinstance(call.func, ast.Name) or call.func.id != forced_tool_name:
|
|
return None
|
|
arguments: dict[str, Any] = {}
|
|
if call.args:
|
|
if len(call.args) != 1 or call.keywords or not single_arg_name:
|
|
return None
|
|
try:
|
|
arguments[single_arg_name] = ast.literal_eval(call.args[0])
|
|
except Exception:
|
|
return None
|
|
return {"arguments": arguments}
|
|
for kw in call.keywords:
|
|
if kw.arg is None:
|
|
return None
|
|
try:
|
|
arguments[kw.arg] = ast.literal_eval(kw.value)
|
|
except Exception:
|
|
return None
|
|
return {"arguments": arguments}
|
|
|
|
|
|
def _forced_tool_event_from_text(
|
|
text: str,
|
|
forced_tool_name: str,
|
|
*,
|
|
single_arg_name: str | None = None,
|
|
) -> dict[str, Any] | None:
|
|
parsed = _json_object_from_text(text)
|
|
if parsed is None:
|
|
parsed = _tool_code_object_from_text(text, forced_tool_name, single_arg_name=single_arg_name)
|
|
if parsed is None:
|
|
return None
|
|
|
|
explicit_name: Any = parsed.get("name") or parsed.get("tool")
|
|
fn = parsed.get("function")
|
|
if explicit_name is None and isinstance(fn, dict):
|
|
explicit_name = fn.get("name")
|
|
if explicit_name is not None and str(explicit_name) != forced_tool_name:
|
|
return None
|
|
|
|
tool_input: Any = None
|
|
if "input" in parsed:
|
|
tool_input = parsed.get("input")
|
|
elif "arguments" in parsed:
|
|
args = parsed.get("arguments")
|
|
if isinstance(args, str):
|
|
try:
|
|
tool_input = json.loads(args)
|
|
except Exception:
|
|
return None
|
|
else:
|
|
tool_input = args
|
|
elif isinstance(fn, dict) and "arguments" in fn:
|
|
args = fn.get("arguments")
|
|
if isinstance(args, str):
|
|
try:
|
|
tool_input = json.loads(args)
|
|
except Exception:
|
|
return None
|
|
else:
|
|
tool_input = args
|
|
else:
|
|
reserved = {"name", "tool", "function", "arguments", "input", "result"}
|
|
tool_input = {k: v for k, v in parsed.items() if k not in reserved}
|
|
|
|
event: dict[str, Any] = {
|
|
"name": forced_tool_name,
|
|
"input": tool_input if tool_input is not None else {},
|
|
}
|
|
if "result" in parsed:
|
|
event["result"] = parsed.get("result")
|
|
return event
|
|
|
|
|
|
def _forced_tool_fallback_event(
|
|
text: str,
|
|
*,
|
|
forced_tool_name: str | None,
|
|
tools: list[dict[str, Any]] | None = None,
|
|
) -> dict[str, Any] | None:
|
|
if not forced_tool_name:
|
|
return None
|
|
return _forced_tool_event_from_text(
|
|
text,
|
|
forced_tool_name,
|
|
single_arg_name=_tool_code_single_arg_name(tools, forced_tool_name),
|
|
)
|
|
|
|
|
|
def _openai_tool_call(tool: dict[str, Any], *, forced_id: str | None = None) -> dict[str, Any]:
|
|
return {
|
|
"id": str(tool.get("id") or forced_id or f"call_{uuid.uuid4().hex}"),
|
|
"type": "function",
|
|
"function": {
|
|
"name": str(tool.get("name") or "tool"),
|
|
"arguments": _json_string(tool.get("input")),
|
|
},
|
|
}
|
|
|
|
|
|
def _anthropic_tool_use_block(
|
|
tool: dict[str, Any], *, forced_id: str | None = None
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"type": "tool_use",
|
|
"id": str(tool.get("id") or forced_id or f"toolu_{uuid.uuid4().hex}"),
|
|
"name": str(tool.get("name") or "tool"),
|
|
"input": tool.get("input") if tool.get("input") is not None else {},
|
|
}
|
|
|
|
|
|
def _anthropic_tool_result_block(
|
|
tool: dict[str, Any], *, forced_id: str | None = None
|
|
) -> dict[str, Any] | None:
|
|
if "result" not in tool:
|
|
return None
|
|
result = tool.get("result")
|
|
if isinstance(result, str):
|
|
content: Any = result
|
|
else:
|
|
content = _json_string(result)
|
|
return {
|
|
"type": "tool_result",
|
|
"tool_use_id": str(tool.get("id") or forced_id or ""),
|
|
"content": content,
|
|
}
|