feat: add emulated tool-calling bridge for Lingma

Add a proxy-side tool emulation layer so Lingma requests can surface stable OpenAI tool_calls and Anthropic tool_use blocks even when upstream tool events are missing or inconsistent.

Constraint: Keep native Lingma tool event bridging as the first path and layer emulation as a fallback

Rejected: Depend exclusively on Lingma native tool/invoke events | tool visibility remains inconsistent across models and transports

Confidence: high

Scope-risk: moderate
This commit is contained in:
mmc
2026-05-07 18:10:01 +08:00
parent 5911e4322e
commit 94a8025ae5
11 changed files with 1808 additions and 4 deletions

657
app/http/tool_emulation.py Normal file
View File

@@ -0,0 +1,657 @@
from __future__ import annotations
import json
import re
import uuid
from dataclasses import dataclass
from typing import Any
@dataclass
class EmulatedToolDef:
name: str
description: str
input_schema: dict[str, Any]
@dataclass
class EmulatedToolChoice:
mode: str
name: str = ""
@dataclass
class EmulatedToolCall:
id: str
name: str
arguments: dict[str, Any]
def extract_openai_tools(raw: Any) -> list[EmulatedToolDef]:
if not isinstance(raw, list):
return []
out: list[EmulatedToolDef] = []
for item in raw:
if not isinstance(item, dict):
continue
fn = item.get("function")
if not isinstance(fn, dict):
continue
name = str(fn.get("name") or "").strip()
if not name:
continue
schema = fn.get("parameters") if isinstance(fn.get("parameters"), dict) else {}
out.append(
EmulatedToolDef(
name=name,
description=str(fn.get("description") or "").strip(),
input_schema=dict(schema),
)
)
return out
def extract_anthropic_tools(raw: Any) -> list[EmulatedToolDef]:
if not isinstance(raw, list):
return []
out: list[EmulatedToolDef] = []
for item in raw:
if not isinstance(item, dict):
continue
tool_type = str(item.get("type") or "").strip()
if tool_type.startswith("web_search_"):
continue
name = str(item.get("name") or "").strip()
if not name:
continue
schema = item.get("input_schema") if isinstance(item.get("input_schema"), dict) else {}
out.append(
EmulatedToolDef(
name=name,
description=str(item.get("description") or "").strip(),
input_schema=dict(schema),
)
)
return out
def extract_openai_tool_choice(raw: Any) -> EmulatedToolChoice:
if raw is None:
return EmulatedToolChoice(mode="auto")
if isinstance(raw, str):
value = raw.strip()
if value in {"", "auto"}:
return EmulatedToolChoice(mode="auto")
if value == "none":
return EmulatedToolChoice(mode="none")
if value in {"required", "any"}:
return EmulatedToolChoice(mode="any")
return EmulatedToolChoice(mode="tool", name=value)
if not isinstance(raw, dict):
return EmulatedToolChoice(mode="auto")
type_name = str(raw.get("type") or "").strip()
if type_name in {"required", "any"}:
return EmulatedToolChoice(mode="any")
if type_name in {"none"}:
return EmulatedToolChoice(mode="none")
if type_name in {"function", "tool"}:
fn = raw.get("function")
if isinstance(fn, dict):
name = str(fn.get("name") or "").strip()
if name:
return EmulatedToolChoice(mode="tool", name=name)
name = str(raw.get("name") or "").strip()
if name:
return EmulatedToolChoice(mode="tool", name=name)
return EmulatedToolChoice(mode="auto")
def extract_anthropic_tool_choice(raw: Any) -> EmulatedToolChoice:
if raw is None:
return EmulatedToolChoice(mode="auto")
if not isinstance(raw, dict):
return extract_openai_tool_choice(raw)
type_name = str(raw.get("type") or "").strip()
if type_name in {"", "auto"}:
return EmulatedToolChoice(mode="auto")
if type_name == "none":
return EmulatedToolChoice(mode="none")
if type_name in {"any", "required"}:
return EmulatedToolChoice(mode="any")
if type_name == "tool":
name = str(raw.get("name") or "").strip()
if name:
return EmulatedToolChoice(mode="tool", name=name)
return EmulatedToolChoice(mode="auto")
def has_tool_request(tools: list[EmulatedToolDef], choice: EmulatedToolChoice) -> bool:
return bool(tools) or choice.mode not in {"", "auto"}
def inject_tooling(system: str, tools: list[EmulatedToolDef], choice: EmulatedToolChoice) -> str:
system = system.strip()
if not tools:
return system
tool_lines: list[str] = []
for tool in tools:
signature = _compact_schema(tool.input_schema)
line = f"{tool.name}({signature})"
if tool.description:
line += f" - {_truncate(tool.description, 120)}"
tool_lines.append(line)
parts = [
"You are an AI assistant with DIRECT tool access.",
"When a request needs local files, terminal state, browser state, current web data, or another external result, use the proxy tools listed below.",
"Do not claim tools are unavailable.",
"When you need to use a tool, output exactly one or more structured action blocks in this format:",
'```json action\n{"tool":"NAME","parameters":{"key":"value"}}\n```',
"Available tools:",
"\n".join(tool_lines),
"Rules:",
"- Use ```json action``` blocks for tool calls.",
"- If a tool is needed, do not explain first; emit the action block directly.",
"- If no tool is needed, answer normally.",
"- Never say tools are unavailable.",
_force_constraint(choice),
]
tooling = "\n\n".join(part for part in parts if part)
if not system:
return tooling
return f"{system}\n\n---\n\n{tooling}"
def action_output_prompt(tool_call_id: str | None, output: str) -> str:
output = (output or "").strip()
if not output:
return ""
suffix = (
"Based on the tool result above, answer the user's request directly if you have enough information. "
"Only use another tool call if a specific missing fact still requires it."
)
if tool_call_id and tool_call_id.strip():
return f"Tool result for {tool_call_id.strip()}:\n{output}\n\n{suffix}"
return f"Tool result:\n{output}\n\n{suffix}"
def parse_action_blocks(
text: str,
tools: list[EmulatedToolDef],
*,
max_scan_bytes: int = 0,
max_tool_calls: int = 8,
) -> tuple[list[EmulatedToolCall], str]:
if not text or not text.strip():
return [], ""
if max_scan_bytes > 0 and len(text) > max_scan_bytes:
text = text[:max_scan_bytes]
tool_name_map = {tool.name.lower(): tool.name for tool in tools if tool.name.strip()}
tool_schema_map = {tool.name: tool.input_schema for tool in tools if tool.name.strip()}
calls: list[EmulatedToolCall] = []
spans: list[tuple[int, int]] = []
seen: set[str] = set()
for match in re.finditer(r"```json(?:\s+action)?\s*(.*?)```", text, flags=re.S | re.I):
raw = (match.group(1) or "").strip()
if not raw:
continue
parsed = _parse_tool_call_json(raw)
if parsed is None:
continue
name, arguments = parsed
normalized = _normalize_tool_name(name, tool_name_map)
schema = tool_schema_map.get(normalized)
if schema:
arguments = _filter_args_by_schema(arguments, schema)
if not _has_required_args(arguments, schema):
continue
key = _tool_call_key(normalized, arguments)
if key in seen:
spans.append(match.span())
continue
seen.add(key)
calls.append(
EmulatedToolCall(
id=_stable_call_id(normalized, arguments),
name=normalized,
arguments=arguments,
)
)
spans.append(match.span())
if len(calls) >= max_tool_calls:
break
if not calls:
return [], text.strip()
clean = text
for start, end in reversed(spans):
clean = clean[:start] + clean[end:]
return calls, clean.strip()
def looks_like_refusal(text: str) -> bool:
lowered = (text or "").strip().lower()
if not lowered:
return False
needles = [
"tools are unavailable",
"cannot call tools",
"can't call tools",
"cannot execute",
"can't execute",
"没有可用的工具",
"工具不可用",
"不能调用工具",
"无法直接执行",
]
return any(needle in lowered for needle in needles)
def looks_like_missed_tool_use(text: str) -> bool:
lowered = (text or "").strip().lower()
if not lowered:
return False
needles = [
"let me use",
"i need to use",
"i will use",
"i need to run",
"i will run",
"我需要使用",
"让我使用",
"执行命令",
"读取文件",
"查看文件",
"查询天气",
"#tool call",
]
return any(needle in lowered for needle in needles)
def infer_tool_calls_from_text(
text: str,
tools: list[EmulatedToolDef],
) -> list[EmulatedToolCall]:
if not (looks_like_refusal(text) or looks_like_missed_tool_use(text)):
return []
direct = infer_declared_tool_call_from_text(text, tools)
return [direct] if direct is not None else []
def force_tooling_prompt(choice: EmulatedToolChoice) -> str:
prompt = (
"Your last response did not include any ```json action``` block. "
"You must respond with at least one valid action block now. "
"Select the single most appropriate available tool for the user request. "
"Do not explain. Do not say tools are unavailable. Output the action block directly."
)
if choice.mode == "tool" and choice.name.strip():
prompt += f' You must call "{choice.name.strip()}".'
return prompt
def infer_declared_tool_call_from_text(
text: str,
tools: list[EmulatedToolDef],
) -> EmulatedToolCall | None:
for tool in tools:
event = _extract_fenced_json_tool_call_event_from_text(
text, forced_tool_name=tool.name
)
if event is None:
event = _extract_hash_tool_call_event_from_text(text, forced_tool_name=tool.name)
if event is None:
event = _extract_function_call_event_from_text(text, forced_tool_name=tool.name)
if event is None:
event = _forced_tool_fallback_event(text, forced_tool_name=tool.name, tools=tools)
if event is None:
continue
schema = tool.input_schema
arguments = dict(event.get("input") or {})
if schema:
arguments = _filter_args_by_schema(arguments, schema)
if not _has_required_args(arguments, schema):
continue
return EmulatedToolCall(
id=_stable_call_id(tool.name, arguments),
name=tool.name,
arguments=arguments,
)
return None
def openai_tool_call_from_emulated(call: EmulatedToolCall) -> dict[str, Any]:
return {
"id": call.id,
"type": "function",
"function": {
"name": call.name,
"arguments": json.dumps(call.arguments, ensure_ascii=False),
},
}
def _extract_hash_tool_call_event_from_text(
text: str,
*,
forced_tool_name: str | None = None,
) -> dict[str, Any] | None:
raw = (text or "").strip()
match = re.search(
r"#Tool Call\s*```([A-Za-z0-9_\-.]+)\s*(\{.*?\})\s*```",
raw,
flags=re.S,
)
if not match:
return None
name = match.group(1).strip()
if forced_tool_name and name != forced_tool_name:
return None
try:
arguments = json.loads(match.group(2))
except Exception:
return None
if not isinstance(arguments, dict):
return None
return {"name": name, "input": arguments}
def _extract_fenced_json_tool_call_event_from_text(
text: str,
*,
forced_tool_name: str | None = None,
) -> dict[str, Any] | None:
raw = (text or "").strip()
match = re.search(r"```json(?:\s+action)?\s*(\{.*?\})\s*```", raw, flags=re.S | re.I)
if not match:
return None
try:
payload = json.loads(match.group(1))
except Exception:
return None
if not isinstance(payload, dict):
return None
name = str(payload.get("tool") or payload.get("name") or "").strip()
fn = payload.get("function")
if not name and isinstance(fn, dict):
name = str(fn.get("name") or "").strip()
if not name:
return None
if forced_tool_name and name != forced_tool_name:
return None
arguments = payload.get("parameters")
if arguments is None:
arguments = payload.get("arguments")
if arguments is None:
arguments = payload.get("input")
if arguments is None and isinstance(fn, dict):
arguments = fn.get("arguments")
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except Exception:
return None
if arguments is None:
arguments = {}
if not isinstance(arguments, dict):
return None
return {"name": name, "input": arguments}
def _extract_function_call_event_from_text(
text: str,
*,
forced_tool_name: str | None = None,
) -> dict[str, Any] | None:
raw = (text or "").strip()
match = re.search(r"<function_calls>\s*(\{.*?\})\s*</function_calls>", raw, flags=re.S)
if not match:
return None
try:
payload = json.loads(match.group(1))
except Exception:
return None
if not isinstance(payload, dict):
return None
name = str(payload.get("name") or "").strip()
if not name:
return None
if forced_tool_name and name != forced_tool_name:
return None
arguments = payload.get("arguments")
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except Exception:
return None
if arguments is None:
arguments = {}
if not isinstance(arguments, dict):
return None
return {"name": name, "input": arguments}
def _forced_tool_fallback_event(
text: str,
*,
forced_tool_name: str | None,
tools: list[EmulatedToolDef],
) -> dict[str, Any] | None:
if not forced_tool_name:
return None
parsed = _tool_code_object_from_text(
text,
forced_tool_name,
single_arg_name=_tool_code_single_arg_name(tools, forced_tool_name),
)
if parsed is None:
try:
parsed = json.loads((text or "").strip())
except Exception:
return None
if not isinstance(parsed, dict):
return None
explicit_name = parsed.get("name") or parsed.get("tool")
if explicit_name is not None and str(explicit_name) != forced_tool_name:
return None
tool_input = parsed.get("input")
if tool_input is None and "arguments" in parsed:
tool_input = parsed.get("arguments")
if isinstance(tool_input, str):
try:
tool_input = json.loads(tool_input)
except Exception:
return None
if tool_input is None:
reserved = {"name", "tool", "function", "arguments", "input", "result"}
tool_input = {k: v for k, v in parsed.items() if k not in reserved}
if not isinstance(tool_input, dict):
return None
return {"name": forced_tool_name, "input": tool_input}
def _tool_code_single_arg_name(
tools: list[EmulatedToolDef], forced_tool_name: str
) -> str | None:
for tool in tools:
if tool.name != forced_tool_name:
continue
properties = tool.input_schema.get("properties")
if not isinstance(properties, dict) or len(properties) != 1:
return None
only_name = next(iter(properties.keys()), None)
return only_name if isinstance(only_name, str) and only_name.strip() else None
return None
def _tool_code_object_from_text(
text: str,
forced_tool_name: str,
*,
single_arg_name: str | None = None,
) -> dict[str, Any] | None:
raw = (text or "").strip()
if not raw.startswith("```") or not raw.endswith("```"):
return None
lines = raw.splitlines()
if len(lines) < 2:
return None
fence = lines[0].strip().lower()
language = fence[3:].strip()
if language and language not in {"tool_code", "python", "py"}:
return None
body = "\n".join(lines[1:-1]).strip()
call_match = re.fullmatch(rf"{re.escape(forced_tool_name)}\((.*)\)", body, flags=re.S)
if not call_match:
return None
arguments_text = call_match.group(1).strip()
if not arguments_text:
return {"arguments": {}}
if single_arg_name and not re.search(r"\w+\s*=", arguments_text):
try:
value = json.loads(arguments_text)
except Exception:
value = arguments_text.strip('"\'')
return {"arguments": {single_arg_name: value}}
arguments: dict[str, Any] = {}
for part in [p.strip() for p in arguments_text.split(",") if p.strip()]:
if "=" not in part:
return None
key, value_text = part.split("=", 1)
key = key.strip()
value_text = value_text.strip()
try:
value = json.loads(value_text)
except Exception:
value = value_text.strip('"\'')
arguments[key] = value
return {"arguments": arguments}
def _parse_tool_call_json(raw: str) -> tuple[str, dict[str, Any]] | None:
try:
obj = json.loads(_normalize_json(raw))
except Exception:
return None
if not isinstance(obj, dict):
return None
name = str(obj.get("tool") or obj.get("name") or "").strip()
fn = obj.get("function")
if not name and isinstance(fn, dict):
name = str(fn.get("name") or "").strip()
if not name:
return None
arguments = obj.get("parameters")
if arguments is None:
arguments = obj.get("arguments")
if arguments is None:
arguments = obj.get("input")
if arguments is None and isinstance(fn, dict):
arguments = fn.get("arguments")
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except Exception:
arguments = {}
if arguments is None:
arguments = {k: v for k, v in obj.items() if k not in {"tool", "name"}}
if not isinstance(arguments, dict):
return None
return name, arguments
def _normalize_tool_name(raw: str, available: dict[str, str]) -> str:
name = raw.strip()
if not name:
return ""
exact = available.get(name.lower())
if exact:
return exact
key = name.lower().replace("-", "_").replace(" ", "_")
aliases = {
"bash": "terminal",
"shell": "terminal",
"read": "read_file",
"grep": "search_files",
"glob": "search_files",
"edit": "patch",
"write": "write_file",
}
mapped = aliases.get(key)
if mapped and mapped in available:
return available[mapped]
return name
def _filter_args_by_schema(args: dict[str, Any], schema: dict[str, Any]) -> dict[str, Any]:
properties = schema.get("properties")
if not isinstance(properties, dict) or not properties:
return args
return {k: v for k, v in args.items() if k in properties}
def _has_required_args(args: dict[str, Any], schema: dict[str, Any]) -> bool:
required = schema.get("required")
if not isinstance(required, list):
return True
for key in required:
if not isinstance(key, str):
continue
if key not in args:
return False
value = args.get(key)
if isinstance(value, str) and not value.strip():
return False
return True
def _compact_schema(schema: dict[str, Any]) -> str:
properties = schema.get("properties")
if not isinstance(properties, dict) or not properties:
return ""
required = {item for item in schema.get("required", []) if isinstance(item, str)}
parts: list[str] = []
for key in sorted(properties.keys()):
parts.append(key if key in required else f"{key}?")
return ", ".join(parts)
def _truncate(text: str, max_len: int) -> str:
text = text.strip()
if len(text) <= max_len:
return text
return text[:max_len] + "..."
def _force_constraint(choice: EmulatedToolChoice) -> str:
if choice.mode == "any":
return "- You must output at least one ```json action``` block in this reply."
if choice.mode == "tool" and choice.name.strip():
return f'- You must call "{choice.name.strip()}" in this reply.'
return ""
def _normalize_json(text: str) -> str:
return (
text.strip()
.replace("", '"')
.replace("", '"')
.replace(",\n}", "\n}")
.replace(",\n]", "\n]")
)
def _tool_call_key(name: str, arguments: dict[str, Any]) -> str:
return f"{name.lower()}\0{json.dumps(arguments, ensure_ascii=False, sort_keys=True)}"
def _stable_call_id(name: str, arguments: dict[str, Any]) -> str:
key = _tool_call_key(name, arguments)
return "call_" + uuid.uuid5(uuid.NAMESPACE_OID, key).hex[:16]