Add experimental Lingma remote backend

This commit is contained in:
lutc5
2026-04-30 12:09:51 +08:00
parent 1c188fcf17
commit 2bcb0a6715
15 changed files with 1543 additions and 37 deletions

View File

@@ -16,6 +16,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"
"lingma-ipc-proxy/internal/service"
@@ -23,9 +24,11 @@ import (
)
type Server struct {
svc *service.Service
http *http.Server
sem chan struct{}
svc *service.Service
http *http.Server
sem chan struct{}
recMu sync.RWMutex
records []debugRequestRecord
// OnRequest is called after each request completes with summary info.
// method, path, statusCode, duration, requestBody, responseBody
OnRequest func(method, path string, statusCode int, duration time.Duration, reqBody, respBody string)
@@ -84,6 +87,16 @@ type modelResponse struct {
Name string `json:"name,omitempty"`
}
type debugRequestRecord struct {
Time string `json:"time"`
Method string `json:"method"`
Path string `json:"path"`
StatusCode int `json:"statusCode"`
DurationMS int64 `json:"durationMs"`
Request string `json:"request,omitempty"`
Response string `json:"response,omitempty"`
}
func NewServer(addr string, svc *service.Service) *Server {
s := &Server{
svc: svc,
@@ -92,6 +105,10 @@ func NewServer(addr string, svc *service.Service) *Server {
mux := http.NewServeMux()
mux.HandleFunc("/", s.handleRoot)
mux.HandleFunc("/health", s.handleRoot)
mux.HandleFunc("/debug/requests", s.handleDebugRequests)
mux.HandleFunc("/debug/logs", s.handleDebugRequests)
mux.HandleFunc("/api/requests", s.handleDebugRequests)
mux.HandleFunc("/api/logs", s.handleDebugRequests)
mux.HandleFunc("/capabilities", s.handleCapabilities)
mux.HandleFunc("/v1/capabilities", s.handleCapabilities)
mux.HandleFunc("/v1/models", s.handleModels)
@@ -151,6 +168,10 @@ func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
@@ -162,6 +183,44 @@ func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
})
}
func (s *Server) handleDebugRequests(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
limit := 50
if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" {
if parsed, err := strconv.Atoi(raw); err == nil {
switch {
case parsed < 1:
limit = 1
case parsed > 200:
limit = 200
default:
limit = parsed
}
}
}
records := s.debugRecords(limit)
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"service": "lingma-ipc-proxy",
"count": len(records),
"requests": records,
"state": s.svc.State(),
})
}
func (s *Server) handleModels(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
@@ -552,6 +611,101 @@ func (s *Server) handleAnthropicStream(w http.ResponseWriter, r *http.Request, r
}
msgID := fmt.Sprintf("msg_%d", time.Now().UnixNano())
if len(req.Tools) > 0 {
result, err := s.svc.Generate(r.Context(), req)
if err != nil {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
streamingHeaders(w)
if err := writeSSEEvent(w, flusher, "message_start", map[string]any{
"type": "message_start",
"message": map[string]any{
"id": msgID,
"type": "message",
"role": "assistant",
"content": []any{},
"model": model,
"stop_reason": nil,
"stop_sequence": nil,
"usage": map[string]any{
"input_tokens": result.InputTokens,
"output_tokens": 0,
},
},
}); err != nil {
return
}
index := 0
if strings.TrimSpace(result.Text) != "" {
if err := writeSSEEvent(w, flusher, "content_block_start", map[string]any{
"type": "content_block_start",
"index": index,
"content_block": map[string]any{"type": "text", "text": ""},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": index,
"delta": map[string]any{"type": "text_delta", "text": result.Text},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_stop", map[string]any{
"type": "content_block_stop",
"index": index,
}); err != nil {
return
}
index++
}
for _, tc := range result.ToolCalls {
if err := writeSSEEvent(w, flusher, "content_block_start", map[string]any{
"type": "content_block_start",
"index": index,
"content_block": map[string]any{"type": "tool_use", "id": tc.ID, "name": tc.Name, "input": map[string]any{}},
}); err != nil {
return
}
argsJSON, _ := json.Marshal(tc.Arguments)
if err := writeSSEEvent(w, flusher, "content_block_delta", map[string]any{
"type": "content_block_delta",
"index": index,
"delta": map[string]any{"type": "input_json_delta", "partial_json": string(argsJSON)},
}); err != nil {
return
}
if err := writeSSEEvent(w, flusher, "content_block_stop", map[string]any{
"type": "content_block_stop",
"index": index,
}); err != nil {
return
}
index++
}
stopReason := "end_turn"
if len(result.ToolCalls) > 0 {
stopReason = "tool_use"
}
_ = writeSSEEvent(w, flusher, "message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{
"stop_reason": stopReason,
"stop_sequence": nil,
},
"usage": map[string]any{
"output_tokens": result.OutputTokens,
},
})
_ = writeSSEEvent(w, flusher, "message_stop", map[string]any{"type": "message_stop"})
return
}
events, done, err := s.svc.GenerateStream(r.Context(), req)
if err != nil {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", err.Error())
@@ -1141,10 +1295,11 @@ func (rw *recordingResponseWriter) Flush() {
func (s *Server) withRecorder(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if s.OnRequest == nil {
if isDebugInspectionPath(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
start := time.Now()
// Read request body for recording, then restore for downstream handler
@@ -1161,10 +1316,54 @@ func (s *Server) withRecorder(next http.Handler) http.Handler {
respBody := sanitizeRecordedBody(rw.body)
go s.OnRequest(r.Method, r.URL.Path, rw.statusCode, duration, reqBody, respBody)
s.recordRequest(r.Method, r.URL.Path, rw.statusCode, duration, reqBody, respBody)
if s.OnRequest != nil {
go s.OnRequest(r.Method, r.URL.Path, rw.statusCode, duration, reqBody, respBody)
}
})
}
func isDebugInspectionPath(path string) bool {
switch path {
case "/debug/requests", "/debug/logs", "/api/requests", "/api/logs":
return true
default:
return false
}
}
func (s *Server) recordRequest(method, path string, statusCode int, duration time.Duration, reqBody, respBody string) {
s.recMu.Lock()
defer s.recMu.Unlock()
s.records = append(s.records, debugRequestRecord{
Time: time.Now().Format(time.RFC3339),
Method: method,
Path: path,
StatusCode: statusCode,
DurationMS: duration.Milliseconds(),
Request: reqBody,
Response: respBody,
})
if len(s.records) > 200 {
s.records = s.records[len(s.records)-200:]
}
}
func (s *Server) debugRecords(limit int) []debugRequestRecord {
s.recMu.RLock()
defer s.recMu.RUnlock()
if limit > len(s.records) {
limit = len(s.records)
}
out := make([]debugRequestRecord, 0, limit)
for i := len(s.records) - 1; i >= 0 && len(out) < limit; i-- {
out = append(out, s.records[i])
}
return out
}
func sanitizeRecordedBody(body []byte) string {
if len(body) == 0 {
return ""
@@ -1254,7 +1453,7 @@ func truncateRecordedString(value string) string {
func withCORS(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, x-api-key, anthropic-version")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)