Release v1.4.4

This commit is contained in:
lutc5
2026-05-06 11:03:55 +08:00
parent a02fd51c19
commit a3a9c278f6
9 changed files with 542 additions and 42 deletions

View File

@@ -471,6 +471,15 @@ func (s *Server) handleAnthropicMessages(w http.ResponseWriter, r *http.Request)
fmt.Printf("[ANTHROPIC REQUEST] %s\n", string(reqBody))
}
if call, ok := anthropicHostedWebSearchCall(req); ok {
if req.Stream {
s.writeAnthropicHostedToolStream(w, req.Model, call)
return
}
s.writeAnthropicHostedToolResponse(w, req.Model, call)
return
}
normalized, err := normalizeAnthropicRequest(req)
if err != nil {
writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
@@ -611,7 +620,7 @@ func (s *Server) handleAnthropicStream(w http.ResponseWriter, r *http.Request, r
}
msgID := fmt.Sprintf("msg_%d", time.Now().UnixNano())
if len(req.Tools) > 0 {
if shouldAggregateToolStream(req) {
result, err := s.svc.Generate(r.Context(), req)
if err != nil {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", err.Error())
@@ -742,6 +751,7 @@ func (s *Server) handleAnthropicStream(w http.ResponseWriter, r *http.Request, r
return
}
filter := newToolStreamFilter(len(req.Tools) > 0)
eventsCh := events
doneCh := done
var final *service.ChatResult
@@ -756,18 +766,20 @@ func (s *Server) handleAnthropicStream(w http.ResponseWriter, r *http.Request, r
eventsCh = nil
continue
}
if event.Delta == "" {
continue
}
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{
"type": "text_delta",
"text": event.Delta,
},
}); err != nil {
return
for _, delta := range filter.Push(event.Delta) {
if delta == "" {
continue
}
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{
"type": "text_delta",
"text": delta,
},
}); err != nil {
return
}
}
case result, ok := <-doneCh:
if !ok {
@@ -800,6 +812,23 @@ func (s *Server) handleAnthropicStream(w http.ResponseWriter, r *http.Request, r
})
return
}
if len(final.ToolCalls) == 0 {
for _, delta := range filter.Flush() {
if delta == "" {
continue
}
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{
"type": "text_delta",
"text": delta,
},
}); err != nil {
return
}
}
}
if err := writeSSEEvent(w, flusher, "content_block_stop", map[string]any{
"type": "content_block_stop",
"index": 0,
@@ -844,6 +873,96 @@ func (s *Server) handleAnthropicStream(w http.ResponseWriter, r *http.Request, r
})
}
func (s *Server) writeAnthropicHostedToolResponse(w http.ResponseWriter, model string, call toolemulation.ToolCall) {
model = strings.TrimSpace(model)
if model == "" {
model = "lingma"
}
writeJSON(w, http.StatusOK, map[string]any{
"id": fmt.Sprintf("msg_%d", time.Now().UnixNano()),
"type": "message",
"role": "assistant",
"content": []map[string]any{{
"type": "tool_use",
"id": call.ID,
"name": call.Name,
"input": call.Arguments,
}},
"model": model,
"stop_reason": "tool_use",
"stop_sequence": nil,
"usage": map[string]any{
"input_tokens": 0,
"output_tokens": 0,
},
})
}
func (s *Server) writeAnthropicHostedToolStream(w http.ResponseWriter, model string, call toolemulation.ToolCall) {
flusher, ok := w.(http.Flusher)
if !ok {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", "streaming is not supported by this server")
return
}
model = strings.TrimSpace(model)
if model == "" {
model = "lingma"
}
streamingHeaders(w)
msgID := fmt.Sprintf("msg_%d", time.Now().UnixNano())
if err := writeSSEEvent(w, flusher, "message_start", map[string]any{
"type": "message_start",
"message": map[string]any{
"id": msgID,
"type": "message",
"role": "assistant",
"content": []any{},
"model": model,
"stop_reason": nil,
"stop_sequence": nil,
"usage": map[string]any{
"input_tokens": 0,
"output_tokens": 0,
},
},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_start", map[string]any{
"type": "content_block_start",
"index": 0,
"content_block": map[string]any{"type": "tool_use", "id": call.ID, "name": call.Name, "input": map[string]any{}},
}); err != nil {
return
}
argsJSON, _ := json.Marshal(call.Arguments)
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{"type": "input_json_delta", "partial_json": string(argsJSON)},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_stop", map[string]any{
"type": "content_block_stop",
"index": 0,
}); err != nil {
return
}
_ = writeSSEEvent(w, flusher, "message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{
"stop_reason": "tool_use",
"stop_sequence": nil,
},
"usage": map[string]any{
"output_tokens": 0,
},
})
_ = writeSSEEvent(w, flusher, "message_stop", map[string]any{"type": "message_stop"})
}
func (s *Server) handleOpenAIStream(w http.ResponseWriter, r *http.Request, req service.ChatRequest) {
flusher, ok := w.(http.Flusher)
if !ok {
@@ -858,7 +977,7 @@ func (s *Server) handleOpenAIStream(w http.ResponseWriter, r *http.Request, req
chatID := fmt.Sprintf("chatcmpl-%d", time.Now().UnixNano())
created := time.Now().Unix()
if len(req.Tools) > 0 {
if shouldAggregateToolStream(req) {
result, err := s.svc.Generate(r.Context(), req)
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error())
@@ -929,8 +1048,10 @@ func (s *Server) handleOpenAIStream(w http.ResponseWriter, r *http.Request, req
return
}
filter := newToolStreamFilter(len(req.Tools) > 0)
eventsCh := events
doneCh := done
var final *service.ChatResult
var finalErr error
for eventsCh != nil || doneCh != nil {
@@ -942,31 +1063,34 @@ func (s *Server) handleOpenAIStream(w http.ResponseWriter, r *http.Request, req
eventsCh = nil
continue
}
if event.Delta == "" {
continue
}
if err := writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": []map[string]any{
{
"index": 0,
"delta": map[string]any{
"content": event.Delta,
for _, delta := range filter.Push(event.Delta) {
if delta == "" {
continue
}
if err := writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": []map[string]any{
{
"index": 0,
"delta": map[string]any{
"content": delta,
},
"finish_reason": nil,
},
"finish_reason": nil,
},
},
}); err != nil {
return
}); err != nil {
return
}
}
case result, ok := <-doneCh:
if !ok {
doneCh = nil
continue
}
final = result.Result
finalErr = result.Err
doneCh = nil
}
@@ -985,6 +1109,63 @@ func (s *Server) handleOpenAIStream(w http.ResponseWriter, r *http.Request, req
flusher.Flush()
return
}
if final == nil {
_ = writeOpenAIChunk(w, flusher, map[string]any{
"error": map[string]any{
"message": "stream finished without a final result",
"type": "api_error",
"code": nil,
"param": nil,
},
})
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
flusher.Flush()
return
}
if len(final.ToolCalls) == 0 {
for _, delta := range filter.Flush() {
if delta == "" {
continue
}
if err := writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": []map[string]any{
{
"index": 0,
"delta": map[string]any{
"content": delta,
},
"finish_reason": nil,
},
},
}); err != nil {
return
}
}
}
for i, tc := range final.ToolCalls {
argsJSON, _ := json.Marshal(tc.Arguments)
_ = writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": []map[string]any{{
"index": 0,
"delta": map[string]any{
"tool_calls": []map[string]any{{
"index": i, "id": tc.ID, "type": "function",
"function": map[string]any{"name": tc.Name, "arguments": string(argsJSON)},
}},
},
"finish_reason": nil,
}},
})
}
finishReason := "stop"
if len(final.ToolCalls) > 0 {
finishReason = "tool_calls"
}
if err := writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID,
"object": "chat.completion.chunk",
@@ -994,7 +1175,7 @@ func (s *Server) handleOpenAIStream(w http.ResponseWriter, r *http.Request, req
{
"index": 0,
"delta": map[string]any{},
"finish_reason": "stop",
"finish_reason": finishReason,
},
},
}); err != nil {
@@ -1004,6 +1185,198 @@ func (s *Server) handleOpenAIStream(w http.ResponseWriter, r *http.Request, req
flusher.Flush()
}
func shouldAggregateToolStream(req service.ChatRequest) bool {
return len(req.Tools) > 0 && truthyEnv("LINGMA_AGGREGATE_TOOL_STREAM")
}
type toolStreamFilter struct {
enabled bool
buffer string
blocked bool
}
func newToolStreamFilter(enabled bool) *toolStreamFilter {
return &toolStreamFilter{enabled: enabled}
}
func (f *toolStreamFilter) Push(delta string) []string {
if delta == "" {
return nil
}
if !f.enabled {
return []string{delta}
}
f.buffer += delta
if f.blocked {
return nil
}
if idx := actionBlockStartIndex(f.buffer); idx >= 0 {
safe := f.buffer[:idx]
f.buffer = f.buffer[idx:]
f.blocked = true
if safe == "" {
return nil
}
return []string{safe}
}
if looksLikeActionPrefix(f.buffer) {
return nil
}
return f.flushSafeTail(96)
}
func (f *toolStreamFilter) Flush() []string {
if f.buffer == "" || f.blocked {
return nil
}
out := f.buffer
f.buffer = ""
return []string{out}
}
func (f *toolStreamFilter) flushSafeTail(tailRunes int) []string {
runes := []rune(f.buffer)
if len(runes) <= tailRunes {
return nil
}
safe := string(runes[:len(runes)-tailRunes])
f.buffer = string(runes[len(runes)-tailRunes:])
if safe == "" {
return nil
}
return []string{safe}
}
func actionBlockStartIndex(text string) int {
lower := strings.ToLower(text)
markers := []string{
"```json action",
"``` action",
"{\"tool\"",
"{\"name\"",
}
best := -1
for _, marker := range markers {
if idx := strings.Index(lower, marker); idx >= 0 && (best == -1 || idx < best) {
best = idx
}
}
return best
}
func looksLikeActionPrefix(text string) bool {
trimmed := strings.ToLower(strings.TrimLeft(text, " \t\r\n"))
if trimmed == "" {
return true
}
prefixes := []string{
"```json action",
"``` action",
"{\"tool\"",
"{\"name\"",
}
for _, prefix := range prefixes {
if strings.HasPrefix(prefix, trimmed) || strings.HasPrefix(trimmed, prefix) {
return true
}
}
return false
}
func truthyEnv(name string) bool {
value := strings.ToLower(strings.TrimSpace(os.Getenv(name)))
return value == "1" || value == "true" || value == "yes" || value == "on"
}
func anthropicHostedWebSearchCall(req anthropicRequest) (toolemulation.ToolCall, bool) {
if !hasAnthropicHostedWebSearchTool(req.Tools) {
return toolemulation.ToolCall{}, false
}
if !anthropicHostedWebSearchRequested(req.Tools, req.ToolChoice) {
return toolemulation.ToolCall{}, false
}
query := anthropicHostedWebSearchQuery(req.Messages)
if query == "" {
return toolemulation.ToolCall{}, false
}
return toolemulation.ToolCall{
ID: fmt.Sprintf("toolu_%d", time.Now().UnixNano()),
Name: "web_search",
Arguments: map[string]any{"query": query},
}, true
}
func hasAnthropicHostedWebSearchTool(raw any) bool {
items, ok := raw.([]any)
if !ok {
return false
}
for _, item := range items {
m, ok := item.(map[string]any)
if !ok {
continue
}
if strings.TrimSpace(stringFromAny(m["name"])) == "web_search" &&
toolemulation.IsAnthropicHostedToolType(stringFromAny(m["type"])) {
return true
}
}
return false
}
func anthropicHostedWebSearchRequested(tools any, choice any) bool {
if m, ok := choice.(map[string]any); ok {
if strings.TrimSpace(stringFromAny(m["name"])) == "web_search" {
return true
}
}
items, ok := tools.([]any)
if !ok || len(items) != 1 {
return false
}
m, ok := items[0].(map[string]any)
if !ok {
return false
}
return strings.TrimSpace(stringFromAny(m["name"])) == "web_search" &&
toolemulation.IsAnthropicHostedToolType(stringFromAny(m["type"]))
}
func anthropicHostedWebSearchQuery(messages []rawMessage) string {
for i := len(messages) - 1; i >= 0; i-- {
if strings.ToLower(strings.TrimSpace(messages[i].Role)) != "user" {
continue
}
text := strings.TrimSpace(extractText(messages[i].Content))
if text == "" {
continue
}
return cleanHostedWebSearchQuery(text)
}
return ""
}
func cleanHostedWebSearchQuery(text string) string {
text = strings.TrimSpace(text)
prefixes := []string{
"Perform a web search for the query:",
"Search the web for:",
"Web search query:",
}
lower := strings.ToLower(text)
for _, prefix := range prefixes {
idx := strings.Index(lower, strings.ToLower(prefix))
if idx >= 0 {
text = strings.TrimSpace(text[idx+len(prefix):])
break
}
}
text = strings.Trim(text, " \t\r\n\"'`")
return text
}
func normalizeAnthropicRequest(req anthropicRequest) (service.ChatRequest, error) {
messages := make([]service.ChatMessage, 0, len(req.Messages))
for _, message := range req.Messages {

View File

@@ -158,6 +158,66 @@ func TestNormalizeAnthropicRequestRejectsEmptyMessages(t *testing.T) {
}
}
func TestAnthropicHostedWebSearchCall(t *testing.T) {
req := anthropicRequest{
Model: "Kimi-K2.6",
Tools: []any{
map[string]any{
"name": "web_search",
"type": "web_search_20250305",
},
},
ToolChoice: map[string]any{
"type": "tool",
"name": "web_search",
},
Messages: []rawMessage{{
Role: "user",
Content: []any{
map[string]any{
"type": "text",
"text": "Perform a web search for the query: Hermes agent web UI documentation",
},
},
}},
}
call, ok := anthropicHostedWebSearchCall(req)
if !ok {
t.Fatal("expected hosted web_search tool call")
}
if call.Name != "web_search" {
t.Fatalf("tool name = %q", call.Name)
}
if call.Arguments["query"] != "Hermes agent web UI documentation" {
t.Fatalf("query = %#v", call.Arguments["query"])
}
if !strings.HasPrefix(call.ID, "toolu_") {
t.Fatalf("id = %q", call.ID)
}
}
func TestAnthropicHostedWebSearchCallIgnoresRegularClientWebSearch(t *testing.T) {
req := anthropicRequest{
Tools: []any{
map[string]any{
"name": "web_search",
"input_schema": map[string]any{
"type": "object",
},
},
},
Messages: []rawMessage{{
Role: "user",
Content: "Perform a web search for the query: Lingma",
}},
}
if _, ok := anthropicHostedWebSearchCall(req); ok {
t.Fatal("regular client web_search should stay in prompt tool emulation")
}
}
func TestDiscoveryCompatibilityEndpoints(t *testing.T) {
server := NewServer("", service.New(service.Config{
Model: "Qwen3-Coder",
@@ -179,6 +239,29 @@ func TestDiscoveryCompatibilityEndpoints(t *testing.T) {
}
}
func TestToolStreamFilterStreamsNormalTextWithTools(t *testing.T) {
filter := newToolStreamFilter(true)
var chunks []string
chunks = append(chunks, filter.Push(strings.Repeat("你", 120))...)
chunks = append(chunks, filter.Push("后续内容")...)
chunks = append(chunks, filter.Flush()...)
out := strings.Join(chunks, "")
if !strings.Contains(out, "后续内容") {
t.Fatalf("streamed text = %q", out)
}
}
func TestToolStreamFilterBuffersActionBlock(t *testing.T) {
filter := newToolStreamFilter(true)
var chunks []string
chunks = append(chunks, filter.Push("```json ")...)
chunks = append(chunks, filter.Push("action\n{\"tool\":\"Bash\",\"parameters\":{\"command\":\"pwd\"}}\n```")...)
chunks = append(chunks, filter.Flush()...)
if len(chunks) != 0 {
t.Fatalf("unexpected leaked action chunks: %#v", chunks)
}
}
func TestParseImageURLReadsLocalFileURL(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "sample.jpg")