Add a proxy-side tool emulation layer so Lingma requests can surface stable OpenAI tool_calls and Anthropic tool_use blocks even when upstream tool events are missing or inconsistent. Constraint: Keep native Lingma tool event bridging as the first path and layer emulation as a fallback Rejected: Depend exclusively on Lingma native tool/invoke events | tool visibility remains inconsistent across models and transports Confidence: high Scope-risk: moderate
486 lines
14 KiB
Python
486 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import ast
|
|
import json
|
|
import re
|
|
import uuid
|
|
from typing import Any
|
|
|
|
|
|
def _json_string(value: Any) -> str:
|
|
if isinstance(value, str):
|
|
return value
|
|
try:
|
|
return json.dumps(value if value is not None else {}, ensure_ascii=False)
|
|
except Exception:
|
|
return "{}"
|
|
|
|
|
|
def _openai_tool_name(tool: Any) -> str | None:
|
|
if not isinstance(tool, dict):
|
|
return None
|
|
if tool.get("type") == "function":
|
|
fn = tool.get("function")
|
|
if isinstance(fn, dict):
|
|
name = fn.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
name = tool.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
return None
|
|
|
|
|
|
def _anthropic_tool_name(tool: Any) -> str | None:
|
|
if not isinstance(tool, dict):
|
|
return None
|
|
name = tool.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
fn = tool.get("function")
|
|
if isinstance(fn, dict):
|
|
nested_name = fn.get("name")
|
|
if isinstance(nested_name, str) and nested_name.strip():
|
|
return nested_name.strip()
|
|
return None
|
|
|
|
|
|
def _tool_event_allowed(
|
|
tool_name: str,
|
|
tool_config: dict[str, Any] | None,
|
|
*,
|
|
forced_tool_name: str | None = None,
|
|
) -> bool:
|
|
if not (
|
|
tool_config
|
|
and isinstance(tool_config.get("tools"), list)
|
|
and tool_config.get("tools")
|
|
):
|
|
return True
|
|
for tool in tool_config.get("tools") or []:
|
|
if tool_name == _anthropic_tool_name(tool) or tool_name == _openai_tool_name(
|
|
tool
|
|
):
|
|
return True
|
|
return bool(forced_tool_name and tool_name == forced_tool_name)
|
|
|
|
|
|
def _allowed_tool_event(
|
|
tool: Any,
|
|
*,
|
|
tool_config: dict[str, Any] | None,
|
|
forced_tool_name: str | None = None,
|
|
) -> dict[str, Any] | None:
|
|
if not isinstance(tool, dict):
|
|
return None
|
|
tool_name = str(tool.get("name") or "")
|
|
if not _tool_event_allowed(
|
|
tool_name, tool_config, forced_tool_name=forced_tool_name
|
|
):
|
|
return None
|
|
return tool
|
|
|
|
|
|
def _allowed_tool_events(
|
|
tool_events: Any,
|
|
*,
|
|
tool_config: dict[str, Any] | None,
|
|
forced_tool_name: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
if not isinstance(tool_events, list):
|
|
return []
|
|
out: list[dict[str, Any]] = []
|
|
for item in tool_events:
|
|
allowed = _allowed_tool_event(
|
|
item,
|
|
tool_config=tool_config,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
if allowed is not None:
|
|
out.append(allowed)
|
|
return out
|
|
|
|
|
|
def _allowed_stream_tool_event(
|
|
event: Any,
|
|
*,
|
|
tool_config: dict[str, Any] | None,
|
|
forced_tool_name: str | None = None,
|
|
) -> dict[str, Any] | None:
|
|
if not isinstance(event, dict) or event.get("type") != "tool":
|
|
return None
|
|
tool = event.get("tool")
|
|
if not isinstance(tool, dict):
|
|
return None
|
|
tool_name = str(tool.get("name") or "")
|
|
if not _tool_event_allowed(
|
|
tool_name, tool_config, forced_tool_name=forced_tool_name
|
|
):
|
|
return None
|
|
return tool
|
|
|
|
|
|
def _openai_forced_tool_name(tool_choice: Any) -> str | None:
|
|
if not isinstance(tool_choice, dict):
|
|
return None
|
|
fn = tool_choice.get("function")
|
|
if isinstance(fn, dict):
|
|
name = fn.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
return None
|
|
|
|
|
|
def _anthropic_forced_tool_name(tool_choice: Any) -> str | None:
|
|
if not isinstance(tool_choice, dict):
|
|
return None
|
|
if tool_choice.get("type") == "tool":
|
|
name = tool_choice.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
fn = tool_choice.get("function")
|
|
if isinstance(fn, dict):
|
|
name = fn.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
return None
|
|
|
|
|
|
def _json_object_from_text(text: str) -> dict[str, Any] | None:
|
|
raw = text.strip()
|
|
if not raw:
|
|
return None
|
|
if raw.startswith("```") and raw.endswith("```"):
|
|
raw = raw[3:-3].strip()
|
|
if raw.lower().startswith("json"):
|
|
raw = raw[4:].strip()
|
|
try:
|
|
parsed = json.loads(raw)
|
|
except Exception:
|
|
return None
|
|
return parsed if isinstance(parsed, dict) else None
|
|
|
|
|
|
def _json_tool_candidate_from_text(text: str) -> dict[str, Any] | None:
|
|
raw = text.strip()
|
|
if not raw:
|
|
return None
|
|
if raw.startswith("```") and raw.endswith("```"):
|
|
raw = raw[3:-3].strip()
|
|
if raw.lower().startswith("json"):
|
|
raw = raw[4:].strip()
|
|
try:
|
|
parsed = json.loads(raw)
|
|
except Exception:
|
|
return None
|
|
if isinstance(parsed, dict):
|
|
return parsed
|
|
if isinstance(parsed, list) and parsed:
|
|
first = parsed[0]
|
|
if isinstance(first, dict):
|
|
return first
|
|
return None
|
|
|
|
|
|
def _extract_tool_calls_from_text(text: str) -> list[dict[str, Any]] | None:
|
|
text = text.strip()
|
|
match = re.search(r"\[tool_calls\]\s*(\[.*\])", text, re.DOTALL)
|
|
if not match:
|
|
return None
|
|
try:
|
|
parsed = json.loads(match.group(1))
|
|
if isinstance(parsed, list) and len(parsed) > 0 and isinstance(parsed[0], dict):
|
|
return parsed
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _extract_hash_tool_call_event_from_text(
|
|
text: str,
|
|
*,
|
|
forced_tool_name: str | None = None,
|
|
) -> dict[str, Any] | None:
|
|
raw = (text or "").strip()
|
|
if not raw:
|
|
return None
|
|
match = re.search(
|
|
r"#Tool Call\s*```([A-Za-z0-9_\-.]+)\s*(\{.*?\})\s*```",
|
|
raw,
|
|
flags=re.S,
|
|
)
|
|
if not match:
|
|
return None
|
|
name = match.group(1).strip()
|
|
if forced_tool_name and name != forced_tool_name:
|
|
return None
|
|
try:
|
|
arguments = json.loads(match.group(2))
|
|
except Exception:
|
|
return None
|
|
if not isinstance(arguments, dict):
|
|
return None
|
|
return {"name": name, "input": arguments}
|
|
|
|
|
|
def _tool_code_single_arg_name(
|
|
tools: list[dict[str, Any]] | None, forced_tool_name: str
|
|
) -> str | None:
|
|
if not isinstance(tools, list):
|
|
return None
|
|
for tool in tools:
|
|
if not isinstance(tool, dict):
|
|
continue
|
|
schema: dict[str, Any] | None = None
|
|
if tool.get("type") == "function":
|
|
fn = tool.get("function")
|
|
if isinstance(fn, dict) and fn.get("name") == forced_tool_name:
|
|
params = fn.get("parameters")
|
|
if isinstance(params, dict):
|
|
schema = params
|
|
elif tool.get("name") == forced_tool_name:
|
|
input_schema = tool.get("input_schema")
|
|
if isinstance(input_schema, dict):
|
|
schema = input_schema
|
|
if not isinstance(schema, dict):
|
|
continue
|
|
properties = schema.get("properties")
|
|
if not isinstance(properties, dict) or len(properties) != 1:
|
|
return None
|
|
only_name = next(iter(properties.keys()), None)
|
|
if isinstance(only_name, str) and only_name.strip():
|
|
return only_name
|
|
return None
|
|
return None
|
|
|
|
|
|
def _tool_code_object_from_text(
|
|
text: str,
|
|
forced_tool_name: str,
|
|
*,
|
|
single_arg_name: str | None = None,
|
|
) -> dict[str, Any] | None:
|
|
raw = text.strip()
|
|
if not raw.startswith("```") or not raw.endswith("```"):
|
|
return None
|
|
lines = raw.splitlines()
|
|
if len(lines) < 2:
|
|
return None
|
|
fence = lines[0].strip().lower()
|
|
language = fence[3:].strip()
|
|
if language and language not in {"tool_code", "python", "py"}:
|
|
return None
|
|
body = "\n".join(lines[1:-1]).strip()
|
|
try:
|
|
parsed = ast.parse(body, mode="eval")
|
|
except Exception:
|
|
return None
|
|
call = parsed.body
|
|
if not isinstance(call, ast.Call):
|
|
return None
|
|
if not isinstance(call.func, ast.Name) or call.func.id != forced_tool_name:
|
|
return None
|
|
arguments: dict[str, Any] = {}
|
|
if call.args:
|
|
if len(call.args) != 1 or call.keywords or not single_arg_name:
|
|
return None
|
|
try:
|
|
arguments[single_arg_name] = ast.literal_eval(call.args[0])
|
|
except Exception:
|
|
return None
|
|
return {"arguments": arguments}
|
|
for kw in call.keywords:
|
|
if kw.arg is None:
|
|
return None
|
|
try:
|
|
arguments[kw.arg] = ast.literal_eval(kw.value)
|
|
except Exception:
|
|
return None
|
|
return {"arguments": arguments}
|
|
|
|
|
|
def _forced_tool_event_from_text(
|
|
text: str,
|
|
forced_tool_name: str,
|
|
*,
|
|
single_arg_name: str | None = None,
|
|
) -> dict[str, Any] | None:
|
|
parsed = _json_tool_candidate_from_text(text)
|
|
if parsed is None:
|
|
parsed = _tool_code_object_from_text(
|
|
text, forced_tool_name, single_arg_name=single_arg_name
|
|
)
|
|
if parsed is None:
|
|
return None
|
|
|
|
explicit_name: Any = parsed.get("name") or parsed.get("tool")
|
|
fn = parsed.get("function")
|
|
if explicit_name is None and isinstance(fn, dict):
|
|
explicit_name = fn.get("name")
|
|
if explicit_name is not None and str(explicit_name) != forced_tool_name:
|
|
return None
|
|
|
|
tool_input: Any = None
|
|
if "input" in parsed:
|
|
tool_input = parsed.get("input")
|
|
elif "arguments" in parsed:
|
|
args = parsed.get("arguments")
|
|
if isinstance(args, str):
|
|
try:
|
|
tool_input = json.loads(args)
|
|
except Exception:
|
|
return None
|
|
else:
|
|
tool_input = args
|
|
elif isinstance(fn, dict) and "arguments" in fn:
|
|
args = fn.get("arguments")
|
|
if isinstance(args, str):
|
|
try:
|
|
tool_input = json.loads(args)
|
|
except Exception:
|
|
return None
|
|
else:
|
|
tool_input = args
|
|
else:
|
|
reserved = {"name", "tool", "function", "arguments", "input", "result"}
|
|
tool_input = {k: v for k, v in parsed.items() if k not in reserved}
|
|
|
|
event: dict[str, Any] = {
|
|
"name": forced_tool_name,
|
|
"input": tool_input if tool_input is not None else {},
|
|
}
|
|
if "result" in parsed:
|
|
event["result"] = parsed.get("result")
|
|
return event
|
|
|
|
|
|
def _forced_tool_fallback_event(
|
|
text: str,
|
|
*,
|
|
forced_tool_name: str | None,
|
|
tools: list[dict[str, Any]] | None = None,
|
|
) -> dict[str, Any] | None:
|
|
if not forced_tool_name:
|
|
return None
|
|
return _forced_tool_event_from_text(
|
|
text,
|
|
forced_tool_name,
|
|
single_arg_name=_tool_code_single_arg_name(tools, forced_tool_name),
|
|
)
|
|
|
|
|
|
def _declared_tool_names(tools: list[dict[str, Any]] | None) -> list[str]:
|
|
if not isinstance(tools, list):
|
|
return []
|
|
out: list[str] = []
|
|
for tool in tools:
|
|
name = _openai_tool_name(tool) or _anthropic_tool_name(tool)
|
|
if name and name not in out:
|
|
out.append(name)
|
|
return out
|
|
|
|
|
|
def _infer_tool_event_from_declared_tools(
|
|
text: str,
|
|
*,
|
|
tools: list[dict[str, Any]] | None,
|
|
) -> dict[str, Any] | None:
|
|
for tool_name in _declared_tool_names(tools):
|
|
inferred = _extract_function_call_event_from_text(
|
|
text,
|
|
forced_tool_name=tool_name,
|
|
)
|
|
if inferred is not None:
|
|
return inferred
|
|
inferred = _extract_hash_tool_call_event_from_text(
|
|
text,
|
|
forced_tool_name=tool_name,
|
|
)
|
|
if inferred is not None:
|
|
return inferred
|
|
inferred = _forced_tool_fallback_event(
|
|
text,
|
|
forced_tool_name=tool_name,
|
|
tools=tools,
|
|
)
|
|
if inferred is not None:
|
|
return inferred
|
|
return None
|
|
|
|
|
|
def _openai_tool_call(
|
|
tool: dict[str, Any], *, forced_id: str | None = None
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"id": str(tool.get("id") or forced_id or f"call_{uuid.uuid4().hex}"),
|
|
"type": "function",
|
|
"function": {
|
|
"name": str(tool.get("name") or "tool"),
|
|
"arguments": _json_string(tool.get("input")),
|
|
},
|
|
}
|
|
|
|
|
|
def _extract_function_call_event_from_text(
|
|
text: str,
|
|
*,
|
|
forced_tool_name: str | None,
|
|
) -> dict[str, Any] | None:
|
|
raw = (text or "").strip()
|
|
if not raw:
|
|
return None
|
|
m = re.search(r"<function_calls>\s*(\{.*?\})\s*</function_calls>", raw, flags=re.S)
|
|
if not m:
|
|
return None
|
|
try:
|
|
payload = json.loads(m.group(1))
|
|
except Exception:
|
|
return None
|
|
if not isinstance(payload, dict):
|
|
return None
|
|
name = payload.get("name")
|
|
if not isinstance(name, str) or not name.strip():
|
|
return None
|
|
name = name.strip()
|
|
if forced_tool_name and name != forced_tool_name:
|
|
return None
|
|
arguments = payload.get("arguments")
|
|
if isinstance(arguments, str):
|
|
try:
|
|
arguments = json.loads(arguments)
|
|
except Exception:
|
|
return None
|
|
if arguments is None:
|
|
arguments = {}
|
|
if not isinstance(arguments, dict):
|
|
return None
|
|
return {"name": name, "input": arguments}
|
|
|
|
|
|
def _anthropic_tool_use_block(
|
|
tool: dict[str, Any], *, forced_id: str | None = None
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"type": "tool_use",
|
|
"id": str(tool.get("id") or forced_id or f"toolu_{uuid.uuid4().hex}"),
|
|
"name": str(tool.get("name") or "tool"),
|
|
"input": tool.get("input") if tool.get("input") is not None else {},
|
|
}
|
|
|
|
|
|
def _anthropic_tool_result_block(
|
|
tool: dict[str, Any], *, forced_id: str | None = None
|
|
) -> dict[str, Any] | None:
|
|
if "result" not in tool:
|
|
return None
|
|
result = tool.get("result")
|
|
if isinstance(result, str):
|
|
content: Any = result
|
|
else:
|
|
content = _json_string(result)
|
|
return {
|
|
"type": "tool_result",
|
|
"tool_use_id": str(tool.get("id") or forced_id or ""),
|
|
"content": content,
|
|
}
|