Files
lingma-proxy-compose/internal/httpapi/server.go
GitHub Actions a4cedecca6 Add Docker Compose Lingma bootstrap support
Package the proxy for Docker Compose deployments and add Lingma bootstrap, session restore, and runtime status support for containerized remote usage.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-07 23:56:05 +08:00

2329 lines
60 KiB
Go

package httpapi
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"image"
_ "image/gif"
"image/jpeg"
_ "image/png"
"io"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"sync"
"time"
"lingma-ipc-proxy/internal/service"
"lingma-ipc-proxy/internal/toolemulation"
)
type Server struct {
svc *service.Service
http *http.Server
sem chan struct{}
recMu sync.RWMutex
records []debugRequestRecord
// OnRequest is called after each request completes with summary info.
// method, path, statusCode, duration, requestBody, responseBody
OnRequest func(method, path string, statusCode int, duration time.Duration, reqBody, respBody string)
}
type anthropicRequest struct {
Model string `json:"model"`
MaxTokens int `json:"max_tokens,omitempty"`
System any `json:"system,omitempty"`
Messages []rawMessage `json:"messages"`
Stream bool `json:"stream,omitempty"`
Tools any `json:"tools,omitempty"`
ToolChoice any `json:"tool_choice,omitempty"`
Temperature *float64 `json:"temperature,omitempty"`
TopP *float64 `json:"top_p,omitempty"`
TopK int `json:"top_k,omitempty"`
StopSequences []string `json:"stop_sequences,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Thinking any `json:"thinking,omitempty"`
}
type openAIChatRequest struct {
Model string `json:"model"`
Messages []rawMessage `json:"messages"`
Stream bool `json:"stream,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
MaxCompletionTokens int `json:"max_completion_tokens,omitempty"`
Tools any `json:"tools,omitempty"`
ToolChoice any `json:"tool_choice,omitempty"`
ParallelToolCalls *bool `json:"parallel_tool_calls,omitempty"`
Temperature *float64 `json:"temperature,omitempty"`
TopP *float64 `json:"top_p,omitempty"`
Stop any `json:"stop,omitempty"`
PresencePenalty float64 `json:"presence_penalty,omitempty"`
FrequencyPenalty float64 `json:"frequency_penalty,omitempty"`
Logprobs bool `json:"logprobs,omitempty"`
TopLogprobs int `json:"top_logprobs,omitempty"`
ResponseFormat any `json:"response_format,omitempty"`
Seed int `json:"seed,omitempty"`
User string `json:"user,omitempty"`
ReasoningEffort string `json:"reasoning_effort,omitempty"`
}
type rawMessage struct {
Role string `json:"role"`
Content any `json:"content"`
ToolCalls []any `json:"tool_calls,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
}
type modelResponse struct {
ID string `json:"id"`
Object string `json:"object"`
Created int64 `json:"created"`
OwnedBy string `json:"owned_by"`
Name string `json:"name,omitempty"`
}
type debugRequestRecord struct {
Time string `json:"time"`
Method string `json:"method"`
Path string `json:"path"`
StatusCode int `json:"statusCode"`
DurationMS int64 `json:"durationMs"`
Request string `json:"request,omitempty"`
Response string `json:"response,omitempty"`
}
func NewServer(addr string, svc *service.Service) *Server {
s := &Server{
svc: svc,
sem: make(chan struct{}, maxConcurrentRequests()),
}
mux := http.NewServeMux()
mux.HandleFunc("/", s.handleRoot)
mux.HandleFunc("/health", s.handleRoot)
mux.HandleFunc("/debug/requests", s.handleDebugRequests)
mux.HandleFunc("/debug/logs", s.handleDebugRequests)
mux.HandleFunc("/api/requests", s.handleDebugRequests)
mux.HandleFunc("/api/logs", s.handleDebugRequests)
mux.HandleFunc("/capabilities", s.handleCapabilities)
mux.HandleFunc("/v1/capabilities", s.handleCapabilities)
mux.HandleFunc("/v1/models", s.handleModels)
mux.HandleFunc("/api/v1/models", s.handleLMStudioModels)
mux.HandleFunc("/api/tags", s.handleOllamaTags)
mux.HandleFunc("/v1/props", s.handleModelProps)
mux.HandleFunc("/props", s.handleModelProps)
mux.HandleFunc("/v1/runtime/status", s.handleRuntimeStatus)
mux.HandleFunc("/runtime/status", s.handleRuntimeStatus)
mux.HandleFunc("/version", s.handleVersion)
mux.HandleFunc("/v1/messages/count_tokens", s.handleAnthropicCountTokens)
mux.HandleFunc("/v1/messages", s.handleAnthropicMessages)
mux.HandleFunc("/v1/chat/completions", s.handleOpenAIChatCompletions)
mux.HandleFunc("/api/v1/chat/completions", s.handleOpenAIChatCompletions)
s.http = &http.Server{
Addr: addr,
Handler: s.withRecorder(withCORS(mux)),
ReadHeaderTimeout: 10 * time.Second,
}
return s
}
func (s *Server) ListenAndServe() error {
return s.http.ListenAndServe()
}
func (s *Server) Shutdown(ctx context.Context) error {
err := s.http.Shutdown(ctx)
if err != nil {
if forceErr := s.http.Close(); forceErr != nil {
err = fmt.Errorf("%w; force close failed: %v", err, forceErr)
} else {
err = nil
}
}
closeErr := s.svc.Close()
if err != nil {
return err
}
return closeErr
}
func (s *Server) SetDefaultModel(model string) {
s.svc.SetDefaultModel(model)
}
func (s *Server) applyDefaultModel(req *service.ChatRequest) {
if strings.TrimSpace(req.Model) == "" {
req.Model = s.svc.DefaultModel()
}
}
func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" && r.URL.Path != "/health" {
writeOpenAIError(w, http.StatusNotFound, "not_found_error", "not found")
return
}
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"service": "lingma-proxy",
"state": s.svc.State(),
})
}
func (s *Server) handleRuntimeStatus(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
state := s.svc.State()
writeJSON(w, http.StatusOK, map[string]any{
"ok": state.Connected,
"service": "lingma-proxy",
"state": state,
})
}
func (s *Server) handleDebugRequests(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
limit := 50
if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" {
if parsed, err := strconv.Atoi(raw); err == nil {
switch {
case parsed < 1:
limit = 1
case parsed > 200:
limit = 200
default:
limit = parsed
}
}
}
records := s.debugRecords(limit)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"service": "lingma-proxy",
"count": len(records),
"requests": records,
"state": s.svc.State(),
})
}
func (s *Server) handleModels(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
models, err := s.svc.ListModels(r.Context())
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
data := make([]modelResponse, 0, len(models))
created := time.Now().Unix()
for _, model := range models {
data = append(data, modelResponse{
ID: model.ID,
Object: "model",
Created: created,
OwnedBy: "lingma",
Name: model.Name,
})
}
writeJSON(w, http.StatusOK, map[string]any{
"object": "list",
"data": data,
})
}
func (s *Server) handleCapabilities(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"service": "lingma-proxy",
"protocols": []string{
"openai.chat_completions",
"anthropic.messages",
"lm_studio.discovery",
"ollama.discovery",
"llamacpp.discovery",
"vllm.discovery",
},
"features": map[string]any{
"streaming": true,
"tools": true,
"tool_prompt_emulation": true,
"tool_alias_mapping": true,
"images": true,
"local_image_paths": true,
"remote_image_urls": true,
"image_auto_resize": true,
"request_log_image_redact": true,
},
"recommended_models": map[string]any{
"default": "kmodel",
"agent_tools": []string{"kmodel", "MiniMax-M2.7", "Qwen3-Coder", "Qwen3.6-Plus"},
"vision": []string{"Kimi-K2.6", "Qwen3-Max", "Qwen3.6-Plus", "MiniMax-M2.7", "Auto"},
"coding": []string{"kmodel", "Qwen3-Coder", "MiniMax-M2.7"},
},
"model_metadata": map[string]any{
"Kimi-K2.6": map[string]any{
"context_window_tokens": 256000,
"modalities": []string{"text", "image", "video"},
"capabilities": []string{"agent", "coding", "tool_use", "vision"},
"basis": "official_kimi_docs",
"source": "https://platform.kimi.ai/docs/guide/kimi-k2-6-quickstart",
},
"Qwen3-Coder": map[string]any{
"context_window_tokens": 256000,
"context_window_note": "native 256K; official Qwen material describes extension up to 1M with extrapolation",
"modalities": []string{"text"},
"capabilities": []string{"agentic_coding", "tool_use"},
"basis": "official_qwen_docs",
"source": "https://qwenlm.github.io/blog/qwen3-coder/",
},
"MiniMax-M2.7": map[string]any{
"context_window_tokens": 204800,
"modalities": []string{"text"},
"capabilities": []string{"agent", "coding", "tool_use", "skills"},
"basis": "minimax_and_nvidia_model_cards",
"source": "https://developer.nvidia.com/blog/minimax-m2-7-advances-scalable-agentic-workflows-on-nvidia-platforms-for-complex-ai-applications/",
},
"Qwen3.6-Plus": map[string]any{
"context_window_tokens": nil,
"modalities": []string{"text", "image"},
"capabilities": []string{"general", "vision_observed_via_lingma"},
"basis": "observed_via_lingma_proxy; no official Lingma-specific context length published in this proxy",
},
},
"endpoints": map[string]any{
"openai_chat": []string{"/v1/chat/completions", "/api/v1/chat/completions"},
"anthropic_messages": "/v1/messages",
"models": []string{"/v1/models", "/api/v1/models", "/api/tags"},
"capabilities": []string{"/capabilities", "/v1/capabilities"},
"props": []string{"/props", "/v1/props"},
"version": "/version",
},
})
}
func (s *Server) handleLMStudioModels(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
models, err := s.svc.ListModels(r.Context())
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
items := make([]map[string]any, 0, len(models))
for _, model := range models {
items = append(items, map[string]any{
"id": model.ID,
"key": model.ID,
"display_name": model.Name,
"type": "llm",
"publisher": "lingma",
"max_context_length": 128000,
"loaded_instances": []map[string]any{
{
"id": model.ID,
"model": model.ID,
"config": map[string]any{
"context_length": 128000,
},
},
},
})
}
writeJSON(w, http.StatusOK, map[string]any{"models": items})
}
func (s *Server) handleOllamaTags(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
models, err := s.svc.ListModels(r.Context())
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
items := make([]map[string]any, 0, len(models))
for _, model := range models {
items = append(items, map[string]any{
"name": model.ID,
"model": model.ID,
"modified_at": time.Now().UTC().Format(time.RFC3339),
"size": 0,
"digest": "",
"details": map[string]any{
"family": "lingma",
"families": []string{"lingma"},
"parameter_size": "",
"quantization_level": "",
},
})
}
writeJSON(w, http.StatusOK, map[string]any{"models": items})
}
func (s *Server) handleModelProps(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
model := strings.TrimSpace(s.svc.DefaultModel())
if model == "" {
model = "kmodel"
}
writeJSON(w, http.StatusOK, map[string]any{
"model_alias": model,
"chat_template": "{{ .Messages }}",
"default_generation_settings": map[string]any{
"n_ctx": 128000,
"temperature": 0.7,
"top_p": 1,
},
})
}
func (s *Server) handleVersion(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"version": "lingma-proxy",
"service": "lingma-proxy",
})
}
func (s *Server) handleAnthropicCountTokens(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodPost {
writeAnthropicError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
var req anthropicRequest
if err := decodeJSON(r, &req); err != nil {
writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"input_tokens": estimateAnthropicInputTokens(req),
})
}
func (s *Server) handleAnthropicMessages(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodPost {
writeAnthropicError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
if !s.acquire(r.Context()) {
writeAnthropicError(w, http.StatusRequestTimeout, "timeout_error", "request was cancelled while waiting for a proxy execution slot")
return
}
defer s.release()
var req anthropicRequest
if err := decodeJSON(r, &req); err != nil {
writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
return
}
if reqBody, _ := json.Marshal(req); len(reqBody) > 0 {
fmt.Printf("[ANTHROPIC REQUEST] %s\n", string(reqBody))
}
if call, ok := anthropicHostedWebSearchCall(req); ok {
if req.Stream {
s.writeAnthropicHostedToolStream(w, req.Model, call)
return
}
s.writeAnthropicHostedToolResponse(w, req.Model, call)
return
}
normalized, err := normalizeAnthropicRequest(req)
if err != nil {
writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
return
}
s.applyDefaultModel(&normalized)
if req.Stream {
s.handleAnthropicStream(w, r, normalized)
return
}
result, err := s.svc.Generate(r.Context(), normalized)
if err != nil {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
content := []map[string]any{{"type": "text", "text": result.Text}}
stopReason := "end_turn"
if len(result.ToolCalls) > 0 {
for _, tc := range result.ToolCalls {
content = append(content, map[string]any{
"type": "tool_use",
"id": tc.ID,
"name": tc.Name,
"input": tc.Arguments,
})
}
stopReason = "tool_use"
}
writeJSON(w, http.StatusOK, map[string]any{
"id": fmt.Sprintf("msg_%d", time.Now().UnixNano()),
"type": "message",
"role": "assistant",
"content": content,
"model": result.Model,
"stop_reason": stopReason,
"stop_sequence": nil,
"usage": map[string]any{
"input_tokens": result.InputTokens,
"output_tokens": result.OutputTokens,
},
})
}
func (s *Server) handleOpenAIChatCompletions(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodPost {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
if !s.acquire(r.Context()) {
writeOpenAIError(w, http.StatusRequestTimeout, "timeout_error", "request was cancelled while waiting for a proxy execution slot")
return
}
defer s.release()
var req openAIChatRequest
if err := decodeJSON(r, &req); err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
return
}
normalized, err := normalizeOpenAIRequest(req)
if err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
return
}
s.applyDefaultModel(&normalized)
if req.Stream {
s.handleOpenAIStream(w, r, normalized)
return
}
result, err := s.svc.Generate(r.Context(), normalized)
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
created := time.Now().Unix()
message := map[string]any{
"role": "assistant",
"content": result.Text,
}
finishReason := "stop"
if len(result.ToolCalls) > 0 {
toolCalls := make([]map[string]any, 0, len(result.ToolCalls))
for _, tc := range result.ToolCalls {
argsJSON, _ := json.Marshal(tc.Arguments)
toolCalls = append(toolCalls, map[string]any{
"id": tc.ID,
"type": "function",
"function": map[string]any{
"name": tc.Name,
"arguments": string(argsJSON),
},
})
}
message["tool_calls"] = toolCalls
finishReason = "tool_calls"
}
writeJSON(w, http.StatusOK, map[string]any{
"id": fmt.Sprintf("chatcmpl-%d", time.Now().UnixNano()),
"object": "chat.completion",
"created": created,
"model": result.Model,
"choices": []map[string]any{
{
"index": 0,
"message": message,
"finish_reason": finishReason,
},
},
"usage": map[string]any{
"prompt_tokens": result.InputTokens,
"completion_tokens": result.OutputTokens,
"total_tokens": result.InputTokens + result.OutputTokens,
},
})
}
func (s *Server) handleAnthropicStream(w http.ResponseWriter, r *http.Request, req service.ChatRequest) {
flusher, ok := w.(http.Flusher)
if !ok {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", "streaming is not supported by this server")
return
}
model := strings.TrimSpace(req.Model)
if model == "" {
model = "lingma"
}
msgID := fmt.Sprintf("msg_%d", time.Now().UnixNano())
if shouldAggregateToolStream(req) {
result, err := s.svc.Generate(r.Context(), req)
if err != nil {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
streamingHeaders(w)
if err := writeSSEEvent(w, flusher, "message_start", map[string]any{
"type": "message_start",
"message": map[string]any{
"id": msgID,
"type": "message",
"role": "assistant",
"content": []any{},
"model": model,
"stop_reason": nil,
"stop_sequence": nil,
"usage": map[string]any{
"input_tokens": result.InputTokens,
"output_tokens": 0,
},
},
}); err != nil {
return
}
index := 0
if strings.TrimSpace(result.Text) != "" {
if err := writeSSEEvent(w, flusher, "content_block_start", map[string]any{
"type": "content_block_start",
"index": index,
"content_block": map[string]any{"type": "text", "text": ""},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": index,
"delta": map[string]any{"type": "text_delta", "text": result.Text},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_stop", map[string]any{
"type": "content_block_stop",
"index": index,
}); err != nil {
return
}
index++
}
for _, tc := range result.ToolCalls {
if err := writeSSEEvent(w, flusher, "content_block_start", map[string]any{
"type": "content_block_start",
"index": index,
"content_block": map[string]any{"type": "tool_use", "id": tc.ID, "name": tc.Name, "input": map[string]any{}},
}); err != nil {
return
}
argsJSON, _ := json.Marshal(tc.Arguments)
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": index,
"delta": map[string]any{"type": "input_json_delta", "partial_json": string(argsJSON)},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_stop", map[string]any{
"type": "content_block_stop",
"index": index,
}); err != nil {
return
}
index++
}
stopReason := "end_turn"
if len(result.ToolCalls) > 0 {
stopReason = "tool_use"
}
_ = writeSSEEvent(w, flusher, "message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{
"stop_reason": stopReason,
"stop_sequence": nil,
},
"usage": map[string]any{
"output_tokens": result.OutputTokens,
},
})
_ = writeSSEEvent(w, flusher, "message_stop", map[string]any{"type": "message_stop"})
return
}
events, done, err := s.svc.GenerateStream(r.Context(), req)
if err != nil {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
streamingHeaders(w)
if err := writeSSEEvent(w, flusher, "message_start", map[string]any{
"type": "message_start",
"message": map[string]any{
"id": msgID,
"type": "message",
"role": "assistant",
"content": []any{},
"model": model,
"stop_reason": nil,
"stop_sequence": nil,
"usage": map[string]any{
"input_tokens": 0,
"output_tokens": 0,
},
},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_start", map[string]any{
"type": "content_block_start",
"index": 0,
"content_block": map[string]any{
"type": "text",
"text": "",
},
}); err != nil {
return
}
filter := newToolStreamFilter(len(req.Tools) > 0)
eventsCh := events
doneCh := done
var final *service.ChatResult
var finalErr error
for eventsCh != nil || doneCh != nil {
select {
case <-r.Context().Done():
return
case event, ok := <-eventsCh:
if !ok {
eventsCh = nil
continue
}
for _, delta := range filter.Push(event.Delta) {
if delta == "" {
continue
}
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{
"type": "text_delta",
"text": delta,
},
}); err != nil {
return
}
}
case result, ok := <-doneCh:
if !ok {
doneCh = nil
continue
}
final = result.Result
finalErr = result.Err
doneCh = nil
}
}
if finalErr != nil {
_ = writeSSEEvent(w, flusher, "error", map[string]any{
"type": "error",
"error": map[string]any{
"type": "api_error",
"message": finalErr.Error(),
},
})
return
}
if final == nil {
_ = writeSSEEvent(w, flusher, "error", map[string]any{
"type": "error",
"error": map[string]any{
"type": "api_error",
"message": "stream finished without a final result",
},
})
return
}
if len(final.ToolCalls) == 0 {
for _, delta := range filter.Flush() {
if delta == "" {
continue
}
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{
"type": "text_delta",
"text": delta,
},
}); err != nil {
return
}
}
}
if err := writeSSEEvent(w, flusher, "content_block_stop", map[string]any{
"type": "content_block_stop",
"index": 0,
}); err != nil {
return
}
for i, tc := range final.ToolCalls {
_ = writeSSEEvent(w, flusher, "content_block_start", map[string]any{
"type": "content_block_start",
"index": i + 1,
"content_block": map[string]any{"type": "tool_use", "id": tc.ID, "name": tc.Name, "input": map[string]any{}},
})
argsJSON, _ := json.Marshal(tc.Arguments)
_ = writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": i + 1,
"delta": map[string]any{"type": "input_json_delta", "partial_json": string(argsJSON)},
})
_ = writeSSEEvent(w, flusher, "content_block_stop", map[string]any{
"type": "content_block_stop",
"index": i + 1,
})
}
stopReason := "end_turn"
if len(final.ToolCalls) > 0 {
stopReason = "tool_use"
}
if err := writeSSEEvent(w, flusher, "message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{
"stop_reason": stopReason,
"stop_sequence": nil,
},
"usage": map[string]any{
"output_tokens": final.OutputTokens,
},
}); err != nil {
return
}
_ = writeSSEEvent(w, flusher, "message_stop", map[string]any{
"type": "message_stop",
})
}
func (s *Server) writeAnthropicHostedToolResponse(w http.ResponseWriter, model string, call toolemulation.ToolCall) {
model = strings.TrimSpace(model)
if model == "" {
model = "lingma"
}
writeJSON(w, http.StatusOK, map[string]any{
"id": fmt.Sprintf("msg_%d", time.Now().UnixNano()),
"type": "message",
"role": "assistant",
"content": []map[string]any{{
"type": "tool_use",
"id": call.ID,
"name": call.Name,
"input": call.Arguments,
}},
"model": model,
"stop_reason": "tool_use",
"stop_sequence": nil,
"usage": map[string]any{
"input_tokens": 0,
"output_tokens": 0,
},
})
}
func (s *Server) writeAnthropicHostedToolStream(w http.ResponseWriter, model string, call toolemulation.ToolCall) {
flusher, ok := w.(http.Flusher)
if !ok {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", "streaming is not supported by this server")
return
}
model = strings.TrimSpace(model)
if model == "" {
model = "lingma"
}
streamingHeaders(w)
msgID := fmt.Sprintf("msg_%d", time.Now().UnixNano())
if err := writeSSEEvent(w, flusher, "message_start", map[string]any{
"type": "message_start",
"message": map[string]any{
"id": msgID,
"type": "message",
"role": "assistant",
"content": []any{},
"model": model,
"stop_reason": nil,
"stop_sequence": nil,
"usage": map[string]any{
"input_tokens": 0,
"output_tokens": 0,
},
},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_start", map[string]any{
"type": "content_block_start",
"index": 0,
"content_block": map[string]any{"type": "tool_use", "id": call.ID, "name": call.Name, "input": map[string]any{}},
}); err != nil {
return
}
argsJSON, _ := json.Marshal(call.Arguments)
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]any{"type": "input_json_delta", "partial_json": string(argsJSON)},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_stop", map[string]any{
"type": "content_block_stop",
"index": 0,
}); err != nil {
return
}
_ = writeSSEEvent(w, flusher, "message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{
"stop_reason": "tool_use",
"stop_sequence": nil,
},
"usage": map[string]any{
"output_tokens": 0,
},
})
_ = writeSSEEvent(w, flusher, "message_stop", map[string]any{"type": "message_stop"})
}
func (s *Server) handleOpenAIStream(w http.ResponseWriter, r *http.Request, req service.ChatRequest) {
flusher, ok := w.(http.Flusher)
if !ok {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", "streaming is not supported by this server")
return
}
model := strings.TrimSpace(req.Model)
if model == "" {
model = "lingma"
}
chatID := fmt.Sprintf("chatcmpl-%d", time.Now().UnixNano())
created := time.Now().Unix()
if shouldAggregateToolStream(req) {
result, err := s.svc.Generate(r.Context(), req)
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
streamingHeaders(w)
_ = writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": []map[string]any{{"index": 0, "delta": map[string]any{"role": "assistant"}, "finish_reason": nil}},
})
if result.Text != "" {
_ = writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": []map[string]any{{"index": 0, "delta": map[string]any{"content": result.Text}, "finish_reason": nil}},
})
}
for i, tc := range result.ToolCalls {
argsJSON, _ := json.Marshal(tc.Arguments)
_ = writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": []map[string]any{{
"index": 0,
"delta": map[string]any{
"tool_calls": []map[string]any{{
"index": i, "id": tc.ID, "type": "function",
"function": map[string]any{"name": tc.Name, "arguments": string(argsJSON)},
}},
},
"finish_reason": nil,
}},
})
}
finishReason := "stop"
if len(result.ToolCalls) > 0 {
finishReason = "tool_calls"
}
_ = writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": []map[string]any{{"index": 0, "delta": map[string]any{}, "finish_reason": finishReason}},
})
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
flusher.Flush()
return
}
events, done, err := s.svc.GenerateStream(r.Context(), req)
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
streamingHeaders(w)
if err := writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": []map[string]any{
{
"index": 0,
"delta": map[string]any{
"role": "assistant",
},
"finish_reason": nil,
},
},
}); err != nil {
return
}
filter := newToolStreamFilter(len(req.Tools) > 0)
eventsCh := events
doneCh := done
var final *service.ChatResult
var finalErr error
for eventsCh != nil || doneCh != nil {
select {
case <-r.Context().Done():
return
case event, ok := <-eventsCh:
if !ok {
eventsCh = nil
continue
}
for _, delta := range filter.Push(event.Delta) {
if delta == "" {
continue
}
if err := writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": []map[string]any{
{
"index": 0,
"delta": map[string]any{
"content": delta,
},
"finish_reason": nil,
},
},
}); err != nil {
return
}
}
case result, ok := <-doneCh:
if !ok {
doneCh = nil
continue
}
final = result.Result
finalErr = result.Err
doneCh = nil
}
}
if finalErr != nil {
_ = writeOpenAIChunk(w, flusher, map[string]any{
"error": map[string]any{
"message": finalErr.Error(),
"type": "api_error",
"code": nil,
"param": nil,
},
})
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
flusher.Flush()
return
}
if final == nil {
_ = writeOpenAIChunk(w, flusher, map[string]any{
"error": map[string]any{
"message": "stream finished without a final result",
"type": "api_error",
"code": nil,
"param": nil,
},
})
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
flusher.Flush()
return
}
if len(final.ToolCalls) == 0 {
for _, delta := range filter.Flush() {
if delta == "" {
continue
}
if err := writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": []map[string]any{
{
"index": 0,
"delta": map[string]any{
"content": delta,
},
"finish_reason": nil,
},
},
}); err != nil {
return
}
}
}
for i, tc := range final.ToolCalls {
argsJSON, _ := json.Marshal(tc.Arguments)
_ = writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID, "object": "chat.completion.chunk", "created": created, "model": model,
"choices": []map[string]any{{
"index": 0,
"delta": map[string]any{
"tool_calls": []map[string]any{{
"index": i, "id": tc.ID, "type": "function",
"function": map[string]any{"name": tc.Name, "arguments": string(argsJSON)},
}},
},
"finish_reason": nil,
}},
})
}
finishReason := "stop"
if len(final.ToolCalls) > 0 {
finishReason = "tool_calls"
}
if err := writeOpenAIChunk(w, flusher, map[string]any{
"id": chatID,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": []map[string]any{
{
"index": 0,
"delta": map[string]any{},
"finish_reason": finishReason,
},
},
}); err != nil {
return
}
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
flusher.Flush()
}
func shouldAggregateToolStream(req service.ChatRequest) bool {
return len(req.Tools) > 0
}
type toolStreamFilter struct {
enabled bool
buffer string
blocked bool
}
func newToolStreamFilter(enabled bool) *toolStreamFilter {
return &toolStreamFilter{enabled: enabled}
}
func (f *toolStreamFilter) Push(delta string) []string {
if delta == "" {
return nil
}
if !f.enabled {
return []string{delta}
}
f.buffer += delta
if f.blocked {
return nil
}
if idx := actionBlockStartIndex(f.buffer); idx >= 0 {
safe := f.buffer[:idx]
f.buffer = f.buffer[idx:]
f.blocked = true
if safe == "" {
return nil
}
return []string{safe}
}
if looksLikeActionPrefix(f.buffer) {
return nil
}
return f.flushSafeTail(96)
}
func (f *toolStreamFilter) Flush() []string {
if f.buffer == "" || f.blocked {
return nil
}
out := f.buffer
f.buffer = ""
return []string{out}
}
func (f *toolStreamFilter) flushSafeTail(tailRunes int) []string {
runes := []rune(f.buffer)
if len(runes) <= tailRunes {
return nil
}
safe := string(runes[:len(runes)-tailRunes])
f.buffer = string(runes[len(runes)-tailRunes:])
if safe == "" {
return nil
}
return []string{safe}
}
func actionBlockStartIndex(text string) int {
lower := strings.ToLower(text)
markers := []string{
"```json action",
"``` action",
"{\"tool\"",
"{\"name\"",
}
best := -1
for _, marker := range markers {
if idx := strings.Index(lower, marker); idx >= 0 && (best == -1 || idx < best) {
best = idx
}
}
return best
}
func looksLikeActionPrefix(text string) bool {
trimmed := strings.ToLower(strings.TrimLeft(text, " \t\r\n"))
if trimmed == "" {
return true
}
prefixes := []string{
"```json action",
"``` action",
"{\"tool\"",
"{\"name\"",
}
for _, prefix := range prefixes {
if strings.HasPrefix(prefix, trimmed) || strings.HasPrefix(trimmed, prefix) {
return true
}
}
return false
}
func truthyEnv(name string) bool {
value := strings.ToLower(strings.TrimSpace(os.Getenv(name)))
return value == "1" || value == "true" || value == "yes" || value == "on"
}
func anthropicHostedWebSearchCall(req anthropicRequest) (toolemulation.ToolCall, bool) {
if !hasAnthropicHostedWebSearchTool(req.Tools) {
return toolemulation.ToolCall{}, false
}
if hasAnthropicToolResult(req.Messages) {
return toolemulation.ToolCall{}, false
}
if !anthropicHostedWebSearchRequested(req.Tools, req.ToolChoice) {
return toolemulation.ToolCall{}, false
}
query := anthropicHostedWebSearchQuery(req.Messages)
if query == "" {
return toolemulation.ToolCall{}, false
}
return toolemulation.ToolCall{
ID: fmt.Sprintf("toolu_%d", time.Now().UnixNano()),
Name: "web_search",
Arguments: map[string]any{"query": query},
}, true
}
func hasAnthropicToolResult(messages []rawMessage) bool {
for _, message := range messages {
items, ok := message.Content.([]any)
if !ok {
continue
}
for _, item := range items {
m, ok := item.(map[string]any)
if ok && stringFromAny(m["type"]) == "tool_result" {
return true
}
}
}
return false
}
func estimateAnthropicInputTokens(req anthropicRequest) int {
payload := map[string]any{
"model": req.Model,
"system": req.System,
"messages": req.Messages,
"tools": req.Tools,
"tool_choice": req.ToolChoice,
"thinking": req.Thinking,
}
raw, err := json.Marshal(payload)
if err != nil {
return 1
}
runes := len([]rune(string(raw)))
if runes == 0 {
return 1
}
tokens := (runes + 2) / 3
if tokens < 1 {
return 1
}
return tokens
}
func hasAnthropicHostedWebSearchTool(raw any) bool {
items, ok := raw.([]any)
if !ok {
return false
}
for _, item := range items {
m, ok := item.(map[string]any)
if !ok {
continue
}
if strings.TrimSpace(stringFromAny(m["name"])) == "web_search" &&
toolemulation.IsAnthropicHostedToolType(stringFromAny(m["type"])) {
return true
}
}
return false
}
func anthropicHostedWebSearchRequested(tools any, choice any) bool {
if m, ok := choice.(map[string]any); ok {
if strings.TrimSpace(stringFromAny(m["name"])) == "web_search" {
return true
}
}
items, ok := tools.([]any)
if !ok || len(items) != 1 {
return false
}
m, ok := items[0].(map[string]any)
if !ok {
return false
}
return strings.TrimSpace(stringFromAny(m["name"])) == "web_search" &&
toolemulation.IsAnthropicHostedToolType(stringFromAny(m["type"]))
}
func anthropicHostedWebSearchQuery(messages []rawMessage) string {
for i := len(messages) - 1; i >= 0; i-- {
if strings.ToLower(strings.TrimSpace(messages[i].Role)) != "user" {
continue
}
text := strings.TrimSpace(extractText(messages[i].Content))
if text == "" {
continue
}
return cleanHostedWebSearchQuery(text)
}
return ""
}
func cleanHostedWebSearchQuery(text string) string {
text = strings.TrimSpace(text)
prefixes := []string{
"Perform a web search for the query:",
"Search the web for:",
"Web search query:",
}
lower := strings.ToLower(text)
for _, prefix := range prefixes {
idx := strings.Index(lower, strings.ToLower(prefix))
if idx >= 0 {
text = strings.TrimSpace(text[idx+len(prefix):])
break
}
}
text = strings.Trim(text, " \t\r\n\"'`")
return text
}
func normalizeAnthropicRequest(req anthropicRequest) (service.ChatRequest, error) {
messages := make([]service.ChatMessage, 0, len(req.Messages))
for _, message := range req.Messages {
role := strings.ToLower(strings.TrimSpace(message.Role))
switch role {
case "user":
text, toolResults := extractAnthropicUserContent(message.Content)
images := extractAnthropicImages(message.Content)
if text != "" || len(images) > 0 {
messages = append(messages, service.ChatMessage{Role: role, Text: text, Images: images})
}
for _, tr := range toolResults {
if strings.TrimSpace(tr.Content) != "" {
messages = append(messages, service.ChatMessage{Role: "tool", Text: tr.Content, ToolCallID: tr.ToolUseID})
}
}
case "assistant":
text, calls := extractAnthropicAssistantContent(message.Content)
if text != "" || len(calls) > 0 {
messages = append(messages, service.ChatMessage{Role: role, Text: text, ToolCalls: calls})
}
}
}
if len(messages) == 0 {
return service.ChatRequest{}, fmt.Errorf("no user or assistant messages found")
}
toolChoice := toolemulation.ToolChoice{Mode: "auto"}
if req.ToolChoice != nil {
toolChoice = toolemulation.ExtractAnthropicToolChoice(req.ToolChoice)
}
return service.ChatRequest{
Model: strings.TrimSpace(req.Model),
System: strings.TrimSpace(extractText(req.System)),
Messages: messages,
Tools: toolemulation.ExtractAnthropicTools(req.Tools),
ToolChoice: toolChoice,
Temperature: req.Temperature,
TopP: req.TopP,
TopK: req.TopK,
Stop: req.StopSequences,
MaxTokens: req.MaxTokens,
}, nil
}
func normalizeOpenAIRequest(req openAIChatRequest) (service.ChatRequest, error) {
messages := make([]service.ChatMessage, 0, len(req.Messages))
systemParts := make([]string, 0, 2)
for _, message := range req.Messages {
role := strings.ToLower(strings.TrimSpace(message.Role))
switch role {
case "system", "developer":
text := strings.TrimSpace(extractText(message.Content))
if text != "" {
systemParts = append(systemParts, text)
}
case "user":
text := strings.TrimSpace(extractText(message.Content))
images := extractOpenAIImages(message.Content)
if text != "" || len(images) > 0 {
messages = append(messages, service.ChatMessage{Role: role, Text: text, Images: images})
}
case "assistant":
text := strings.TrimSpace(extractText(message.Content))
calls := extractOpenAIToolCalls(message.ToolCalls)
if text != "" || len(calls) > 0 {
messages = append(messages, service.ChatMessage{Role: role, Text: text, ToolCalls: calls})
}
case "tool":
output := strings.TrimSpace(extractText(message.Content))
if output == "" || message.ToolCallID == "" {
continue
}
messages = append(messages, service.ChatMessage{Role: "tool", Text: output, ToolCallID: message.ToolCallID})
}
}
if len(messages) == 0 {
return service.ChatRequest{}, fmt.Errorf("no user or assistant messages found")
}
return service.ChatRequest{
Model: strings.TrimSpace(req.Model),
System: strings.Join(systemParts, "\n\n"),
Messages: messages,
Tools: toolemulation.ExtractTools(req.Tools),
ToolChoice: toolemulation.ExtractToolChoice(req.ToolChoice),
ParallelToolCalls: req.ParallelToolCalls,
Temperature: req.Temperature,
TopP: req.TopP,
Stop: extractStop(req.Stop),
PresencePenalty: req.PresencePenalty,
FrequencyPenalty: req.FrequencyPenalty,
MaxTokens: maxTokens(req.MaxTokens, req.MaxCompletionTokens),
Seed: req.Seed,
User: req.User,
ReasoningEffort: req.ReasoningEffort,
ResponseFormat: extractResponseFormat(req.ResponseFormat),
}, nil
}
func extractStop(stop any) []string {
if stop == nil {
return nil
}
switch typed := stop.(type) {
case string:
if typed != "" {
return []string{typed}
}
case []any:
out := make([]string, 0, len(typed))
for _, item := range typed {
if s := stringFromAny(item); s != "" {
out = append(out, s)
}
}
return out
case []string:
return typed
}
return nil
}
func extractResponseFormat(rf any) string {
if rf == nil {
return ""
}
m, ok := rf.(map[string]any)
if !ok {
return ""
}
return stringFromAny(m["type"])
}
func maxTokens(a, b int) int {
if b > 0 {
return b
}
return a
}
func extractText(content any) string {
switch typed := content.(type) {
case nil:
return ""
case string:
return strings.TrimSpace(typed)
case []any:
parts := make([]string, 0, len(typed))
for _, item := range typed {
text := extractText(item)
if text != "" {
parts = append(parts, text)
}
}
return strings.Join(parts, "\n")
case map[string]any:
if text := stringFromAny(typed["text"]); text != "" {
return text
}
if text := stringFromAny(typed["input_text"]); text != "" {
return text
}
if nested := extractText(typed["content"]); nested != "" {
return nested
}
return ""
default:
return ""
}
}
func stringFromAny(value any) string {
if value == nil {
return ""
}
switch typed := value.(type) {
case string:
return strings.TrimSpace(typed)
default:
return ""
}
}
func decodeJSON(r *http.Request, out any) error {
defer r.Body.Close()
decoder := json.NewDecoder(r.Body)
decoder.UseNumber()
if err := decoder.Decode(out); err != nil {
return fmt.Errorf("invalid JSON body: %w", err)
}
return nil
}
func writeJSON(w http.ResponseWriter, status int, data any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(data)
}
func writeAnthropicError(w http.ResponseWriter, status int, kind string, message string) {
writeJSON(w, status, map[string]any{
"type": "error",
"error": map[string]any{
"type": kind,
"message": message,
},
})
}
func writeOpenAIError(w http.ResponseWriter, status int, kind string, message string) {
writeJSON(w, status, map[string]any{
"error": map[string]any{
"message": message,
"type": kind,
"code": nil,
"param": nil,
},
})
}
func streamingHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
}
func writeSSEEvent(w http.ResponseWriter, flusher http.Flusher, event string, payload any) error {
body, err := json.Marshal(payload)
if err != nil {
return err
}
if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil {
return err
}
if _, err := fmt.Fprintf(w, "data: %s\n\n", body); err != nil {
return err
}
flusher.Flush()
return nil
}
func writeOpenAIChunk(w http.ResponseWriter, flusher http.Flusher, payload any) error {
body, err := json.Marshal(payload)
if err != nil {
return err
}
if _, err := fmt.Fprintf(w, "data: %s\n\n", body); err != nil {
return err
}
flusher.Flush()
return nil
}
type recordingResponseWriter struct {
http.ResponseWriter
statusCode int
body []byte
wrote bool
}
func (rw *recordingResponseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.wrote = true
rw.ResponseWriter.WriteHeader(code)
}
func (rw *recordingResponseWriter) Write(b []byte) (int, error) {
if !rw.wrote {
rw.WriteHeader(http.StatusOK)
}
rw.body = append(rw.body, b...)
return rw.ResponseWriter.Write(b)
}
func (rw *recordingResponseWriter) Flush() {
if flusher, ok := rw.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
}
func (s *Server) withRecorder(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if isDebugInspectionPath(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
start := time.Now()
// Read request body for recording, then restore for downstream handler
var reqBody string
if r.Body != nil && r.Body != http.NoBody {
body, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewReader(body))
reqBody = sanitizeRecordedBody(body)
}
rw := &recordingResponseWriter{ResponseWriter: w, statusCode: 200}
next.ServeHTTP(rw, r)
duration := time.Since(start)
respBody := sanitizeRecordedBody(rw.body)
s.recordRequest(r.Method, r.URL.Path, rw.statusCode, duration, reqBody, respBody)
if s.OnRequest != nil {
go s.OnRequest(r.Method, r.URL.Path, rw.statusCode, duration, reqBody, respBody)
}
})
}
func isDebugInspectionPath(path string) bool {
switch path {
case "/debug/requests", "/debug/logs", "/api/requests", "/api/logs":
return true
default:
return false
}
}
func (s *Server) recordRequest(method, path string, statusCode int, duration time.Duration, reqBody, respBody string) {
s.recMu.Lock()
defer s.recMu.Unlock()
s.records = append(s.records, debugRequestRecord{
Time: time.Now().Format(time.RFC3339),
Method: method,
Path: path,
StatusCode: statusCode,
DurationMS: duration.Milliseconds(),
Request: reqBody,
Response: respBody,
})
if len(s.records) > 200 {
s.records = s.records[len(s.records)-200:]
}
}
func (s *Server) debugRecords(limit int) []debugRequestRecord {
s.recMu.RLock()
defer s.recMu.RUnlock()
if limit > len(s.records) {
limit = len(s.records)
}
out := make([]debugRequestRecord, 0, limit)
for i := len(s.records) - 1; i >= 0 && len(out) < limit; i-- {
out = append(out, s.records[i])
}
return out
}
func sanitizeRecordedBody(body []byte) string {
if len(body) == 0 {
return ""
}
var value any
if err := json.Unmarshal(body, &value); err != nil {
return truncateRecordedString(string(body))
}
return truncateRecordedString(string(mustMarshalJSON(redactRecordedValue(value))))
}
func redactRecordedValue(value any) any {
switch typed := value.(type) {
case map[string]any:
out := make(map[string]any, len(typed))
for k, v := range typed {
lower := strings.ToLower(k)
if lower == "data" || lower == "url" {
if s := stringFromAny(v); looksLikeImagePayload(s) {
out[k] = imageRedaction(s)
continue
}
}
out[k] = redactRecordedValue(v)
}
return out
case []any:
out := make([]any, 0, len(typed))
for _, item := range typed {
out = append(out, redactRecordedValue(item))
}
return out
case string:
if looksLikeImagePayload(typed) {
return imageRedaction(typed)
}
return typed
default:
return typed
}
}
func looksLikeImagePayload(value string) bool {
value = strings.TrimSpace(value)
if strings.HasPrefix(value, "data:image/") {
return true
}
if len(value) > 4096 && isLikelyBase64(value) {
return true
}
return false
}
func imageRedaction(value string) string {
return fmt.Sprintf("[image payload redacted, %d chars]", len(value))
}
func isLikelyBase64(value string) bool {
for _, r := range value {
if r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z' || r >= '0' && r <= '9' || r == '+' || r == '/' || r == '=' || r == '\n' || r == '\r' {
continue
}
return false
}
return true
}
func mustMarshalJSON(value any) []byte {
body, err := json.Marshal(value)
if err != nil {
return []byte("{}")
}
return body
}
func truncateRecordedString(value string) string {
return value
}
func withCORS(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, x-api-key, anthropic-version")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
func maxConcurrentRequests() int {
raw := strings.TrimSpace(os.Getenv("LINGMA_PROXY_MAX_CONCURRENT"))
if raw == "" {
return 4
}
n, err := strconv.Atoi(raw)
if err != nil || n < 1 {
return 4
}
if n > 16 {
return 16
}
return n
}
func (s *Server) acquire(ctx context.Context) bool {
select {
case s.sem <- struct{}{}:
return true
case <-ctx.Done():
return false
}
}
func (s *Server) release() {
select {
case <-s.sem:
default:
}
}
func extractOpenAIToolCalls(raw []any) []toolemulation.ToolCall {
if len(raw) == 0 {
return nil
}
out := make([]toolemulation.ToolCall, 0, len(raw))
for _, item := range raw {
m, ok := item.(map[string]any)
if !ok {
continue
}
id := stringFromAny(m["id"])
fn, ok := m["function"].(map[string]any)
if !ok {
continue
}
name := stringFromAny(fn["name"])
if name == "" {
continue
}
argsRaw := stringFromAny(fn["arguments"])
var args map[string]any
if argsRaw != "" {
_ = json.Unmarshal([]byte(argsRaw), &args)
}
out = append(out, toolemulation.ToolCall{
ID: id,
Name: name,
Arguments: args,
})
}
return out
}
type anthropicToolResult struct {
ToolUseID string
Content string
}
func extractAnthropicUserContent(content any) (string, []anthropicToolResult) {
items, ok := content.([]any)
if !ok {
return extractText(content), nil
}
var results []anthropicToolResult
var textParts []string
for _, item := range items {
m, ok := item.(map[string]any)
if !ok {
continue
}
switch stringFromAny(m["type"]) {
case "text":
if t := stringFromAny(m["text"]); t != "" {
textParts = append(textParts, t)
}
case "thinking", "redacted_thinking":
// Skip thinking blocks in user messages
continue
case "tool_result":
toolUseID := stringFromAny(m["tool_use_id"])
resultText := extractText(m["content"])
if resultText != "" {
results = append(results, anthropicToolResult{
ToolUseID: toolUseID,
Content: resultText,
})
}
}
}
text := ""
if len(textParts) > 0 {
text = strings.Join(textParts, "\n")
}
return text, results
}
func extractAnthropicAssistantContent(content any) (string, []toolemulation.ToolCall) {
items, ok := content.([]any)
if !ok {
return extractText(content), nil
}
calls := make([]toolemulation.ToolCall, 0, len(items))
var textParts []string
for _, item := range items {
m, ok := item.(map[string]any)
if !ok {
continue
}
switch stringFromAny(m["type"]) {
case "text":
if t := stringFromAny(m["text"]); t != "" {
textParts = append(textParts, t)
}
case "thinking", "redacted_thinking":
// Skip thinking blocks — they are not part of the conversation text
continue
case "tool_use":
id := stringFromAny(m["id"])
name := stringFromAny(m["name"])
if name == "" {
continue
}
var args map[string]any
if rawInput, ok := m["input"].(map[string]any); ok {
args = rawInput
} else if inputStr, ok := m["input"].(string); ok && inputStr != "" {
if err := json.Unmarshal([]byte(inputStr), &args); err != nil {
args = map[string]any{}
}
}
calls = append(calls, toolemulation.ToolCall{
ID: id,
Name: name,
Arguments: args,
})
}
}
text := ""
if len(textParts) > 0 {
text = strings.Join(textParts, "\n")
}
return text, calls
}
func extractOpenAIImages(content any) []service.Image {
items, ok := content.([]any)
if !ok {
return nil
}
var images []service.Image
for _, item := range items {
m, ok := item.(map[string]any)
if !ok {
continue
}
if stringFromAny(m["type"]) != "image_url" {
continue
}
imageURL, ok := m["image_url"].(map[string]any)
if !ok {
continue
}
url := stringFromAny(imageURL["url"])
if url == "" {
continue
}
img := parseImageURL(url)
if img != nil {
images = append(images, *img)
}
}
return images
}
func extractAnthropicImages(content any) []service.Image {
items, ok := content.([]any)
if !ok {
return nil
}
var images []service.Image
for _, item := range items {
m, ok := item.(map[string]any)
if !ok {
continue
}
if stringFromAny(m["type"]) != "image" {
continue
}
source, ok := m["source"].(map[string]any)
if !ok {
continue
}
if stringFromAny(source["type"]) != "base64" {
continue
}
mediaType := stringFromAny(source["media_type"])
data := stringFromAny(source["data"])
if data == "" {
continue
}
images = append(images, service.Image{
MediaType: mediaType,
Data: data,
})
}
return images
}
func parseImageURL(url string) *service.Image {
if strings.HasPrefix(url, "data:") {
return normalizeImage(parseDataURL(url))
}
if img := parseLocalImagePath(url); img != nil {
return normalizeImage(img)
}
img, err := fetchImageAsBase64(url)
if err != nil {
return nil
}
return normalizeImage(img)
}
func parseLocalImagePath(raw string) *service.Image {
raw = strings.TrimSpace(raw)
if raw == "" {
return nil
}
path := raw
if strings.HasPrefix(raw, "file://") {
u, err := url.Parse(raw)
if err != nil {
return nil
}
path = u.Path
}
if strings.HasPrefix(path, "~") {
home, err := os.UserHomeDir()
if err != nil {
return nil
}
path = home + strings.TrimPrefix(path, "~")
}
if !strings.HasPrefix(path, "/") {
return nil
}
data, err := os.ReadFile(path)
if err != nil || len(data) == 0 {
return nil
}
return &service.Image{
MediaType: mediaTypeForImagePath(path),
Data: base64.StdEncoding.EncodeToString(data),
URL: raw,
}
}
func mediaTypeForImagePath(path string) string {
lower := strings.ToLower(path)
switch {
case strings.HasSuffix(lower, ".png"):
return "image/png"
case strings.HasSuffix(lower, ".gif"):
return "image/gif"
case strings.HasSuffix(lower, ".webp"):
return "image/webp"
case strings.HasSuffix(lower, ".bmp"):
return "image/bmp"
default:
return "image/jpeg"
}
}
func parseDataURL(url string) *service.Image {
const prefix = "data:"
if !strings.HasPrefix(url, prefix) {
return nil
}
rest := url[len(prefix):]
commaIdx := strings.Index(rest, ",")
if commaIdx < 0 {
return nil
}
meta := rest[:commaIdx]
data := rest[commaIdx+1:]
mediaType := ""
if strings.HasSuffix(meta, ";base64") {
mediaType = strings.TrimSuffix(meta, ";base64")
} else {
mediaType = meta
}
return &service.Image{
MediaType: mediaType,
Data: data,
}
}
func fetchImageAsBase64(url string) (*service.Image, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("fetch image failed: %s", resp.Status)
}
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
mediaType := resp.Header.Get("Content-Type")
if mediaType == "" {
mediaType = "image/jpeg"
} else {
// Strip parameters like "image/png; charset=utf-8"
if idx := strings.Index(mediaType, ";"); idx >= 0 {
mediaType = strings.TrimSpace(mediaType[:idx])
}
}
return &service.Image{
MediaType: mediaType,
Data: base64.StdEncoding.EncodeToString(data),
}, nil
}
func normalizeImage(img *service.Image) *service.Image {
if img == nil || strings.TrimSpace(img.Data) == "" {
return img
}
data, err := base64.StdEncoding.DecodeString(img.Data)
if err != nil || len(data) == 0 {
return img
}
const maxImageBytes = 2 * 1024 * 1024
const maxImageSide = 1568
if len(data) <= maxImageBytes {
if cfg, _, err := image.DecodeConfig(bytes.NewReader(data)); err == nil {
if cfg.Width <= maxImageSide && cfg.Height <= maxImageSide {
return img
}
}
}
decoded, _, err := image.Decode(bytes.NewReader(data))
if err != nil {
return img
}
bounds := decoded.Bounds()
width := bounds.Dx()
height := bounds.Dy()
if width <= 0 || height <= 0 {
return img
}
targetWidth, targetHeight := scaledDimensions(width, height, maxImageSide)
dst := resizeNearest(decoded, targetWidth, targetHeight)
var buf bytes.Buffer
if err := jpeg.Encode(&buf, dst, &jpeg.Options{Quality: 85}); err != nil {
return img
}
img.MediaType = "image/jpeg"
img.Data = base64.StdEncoding.EncodeToString(buf.Bytes())
return img
}
func resizeNearest(src image.Image, width int, height int) *image.RGBA {
dst := image.NewRGBA(image.Rect(0, 0, width, height))
bounds := src.Bounds()
srcWidth := bounds.Dx()
srcHeight := bounds.Dy()
for y := 0; y < height; y++ {
sy := bounds.Min.Y + y*srcHeight/height
for x := 0; x < width; x++ {
sx := bounds.Min.X + x*srcWidth/width
dst.Set(x, y, src.At(sx, sy))
}
}
return dst
}
func scaledDimensions(width int, height int, maxSide int) (int, int) {
if width <= maxSide && height <= maxSide {
return width, height
}
if width >= height {
scaledHeight := height * maxSide / width
if scaledHeight < 1 {
scaledHeight = 1
}
return maxSide, scaledHeight
}
scaledWidth := width * maxSide / height
if scaledWidth < 1 {
scaledWidth = 1
}
return scaledWidth, maxSide
}