commit 585c3ba5ab3150e85cdced1fb3a4341ca7128319 Author: coolxll Date: Wed Mar 25 21:35:19 2026 +0800 feat: add Lingma IPC proxy service diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1813f06 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +bin/ +dist/ +*.exe +*.dll +*.so +*.dylib +*.test +*.out +coverage.* +.idea/ +.vscode/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..316204e --- /dev/null +++ b/README.md @@ -0,0 +1,87 @@ +# lingma-ipc-proxy + +A standalone Go backend that talks to Lingma over Windows named-pipe IPC and exposes: + +- `GET /v1/models` +- `POST /v1/messages` +- `POST /v1/chat/completions` + +Current scope: + +- non-streaming only +- one request at a time +- Windows only +- directly uses Lingma IPC, not DOM/CDP + +## Run + +```powershell +cd C:\Workspace\Personal\lingma-ipc-proxy +go run .\cmd\lingma-ipc-proxy +``` + +## Flags + +```powershell +go run .\cmd\lingma-ipc-proxy --port 8095 --session-mode auto +``` + +- `--host` +- `--port` +- `--pipe` +- `--cwd` +- `--current-file-path` +- `--mode` +- `--shell-type` +- `--session-mode` +- `--timeout` + +## Environment + +- `LINGMA_IPC_PIPE` +- `LINGMA_PROXY_HOST` +- `LINGMA_PROXY_PORT` +- `LINGMA_PROXY_CWD` +- `LINGMA_PROXY_CURRENT_FILE_PATH` +- `LINGMA_PROXY_MODE` +- `LINGMA_PROXY_SHELL_TYPE` +- `LINGMA_PROXY_SESSION_MODE` +- `LINGMA_PROXY_TIMEOUT_SECONDS` + +## Examples + +Anthropic: + +```powershell +$body = @{ + model = "dashscope_qwen3_coder" + messages = @( + @{ role = "user"; content = "请只回复:ANTHROPIC_OK" } + ) + stream = $false +} | ConvertTo-Json -Depth 8 + +Invoke-RestMethod ` + -Method Post ` + -Uri http://127.0.0.1:8095/v1/messages ` + -ContentType "application/json" ` + -Body $body +``` + +OpenAI: + +```powershell +$body = @{ + model = "dashscope_qwen3_coder" + messages = @( + @{ role = "user"; content = "请只回复:OPENAI_OK" } + ) + stream = $false +} | ConvertTo-Json -Depth 8 + +Invoke-RestMethod ` + -Method Post ` + -Uri http://127.0.0.1:8095/v1/chat/completions ` + -ContentType "application/json" ` + -Body $body +``` diff --git a/cmd/lingma-ipc-proxy/main.go b/cmd/lingma-ipc-proxy/main.go new file mode 100644 index 0000000..6c342fa --- /dev/null +++ b/cmd/lingma-ipc-proxy/main.go @@ -0,0 +1,117 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "lingma-ipc-proxy/internal/httpapi" + "lingma-ipc-proxy/internal/service" +) + +func main() { + cfg := loadConfig() + addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) + + svc := service.New(cfg) + server := httpapi.NewServer(addr, svc) + + log.Printf("lingma-ipc-proxy listening on http://%s", addr) + log.Printf("session mode: %s", cfg.SessionMode) + + errCh := make(chan error, 1) + go func() { + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + errCh <- err + } + }() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + select { + case err := <-errCh: + log.Fatal(err) + case sig := <-sigCh: + log.Printf("received %s, shutting down", sig.String()) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + log.Fatal(err) + } +} + +func loadConfig() service.Config { + hostDefault := envString("LINGMA_PROXY_HOST", "127.0.0.1") + portDefault := envInt("LINGMA_PROXY_PORT", 8095) + pipeDefault := envString("LINGMA_IPC_PIPE", "") + cwdDefault := envString("LINGMA_PROXY_CWD", currentDir()) + currentFilePathDefault := envString("LINGMA_PROXY_CURRENT_FILE_PATH", "") + modeDefault := envString("LINGMA_PROXY_MODE", "agent") + shellTypeDefault := envString("LINGMA_PROXY_SHELL_TYPE", "powershell") + timeoutDefault := envInt("LINGMA_PROXY_TIMEOUT_SECONDS", 120) + sessionModeDefault := envString("LINGMA_PROXY_SESSION_MODE", string(service.SessionModeAuto)) + + host := flag.String("host", hostDefault, "Listen host") + port := flag.Int("port", portDefault, "Listen port") + pipe := flag.String("pipe", pipeDefault, "Explicit Lingma named pipe path") + cwd := flag.String("cwd", cwdDefault, "Working directory used when creating Lingma sessions") + currentFilePath := flag.String("current-file-path", currentFilePathDefault, "Current file path sent through ACP meta") + mode := flag.String("mode", modeDefault, "Lingma ACP mode value") + shellType := flag.String("shell-type", shellTypeDefault, "Shell type sent through ACP meta") + timeoutSeconds := flag.Int("timeout", timeoutDefault, "Per-request timeout in seconds") + sessionMode := flag.String("session-mode", sessionModeDefault, "Session mode: auto, fresh, reuse") + flag.Parse() + + parsedSessionMode := service.SessionMode(strings.ToLower(strings.TrimSpace(*sessionMode))) + switch parsedSessionMode { + case service.SessionModeAuto, service.SessionModeFresh, service.SessionModeReuse: + default: + log.Fatalf("invalid --session-mode %q; expected auto, fresh, or reuse", *sessionMode) + } + + return service.Config{ + Host: strings.TrimSpace(*host), + Port: *port, + Pipe: strings.TrimSpace(*pipe), + Cwd: strings.TrimSpace(*cwd), + CurrentFilePath: strings.TrimSpace(*currentFilePath), + Mode: strings.TrimSpace(*mode), + ShellType: strings.TrimSpace(*shellType), + SessionMode: parsedSessionMode, + Timeout: time.Duration(*timeoutSeconds) * time.Second, + } +} + +func envString(key string, fallback string) string { + if value := strings.TrimSpace(os.Getenv(key)); value != "" { + return value + } + return fallback +} + +func envInt(key string, fallback int) int { + if value := strings.TrimSpace(os.Getenv(key)); value != "" { + if n, err := strconv.Atoi(value); err == nil { + return n + } + } + return fallback +} + +func currentDir() string { + if wd, err := os.Getwd(); err == nil { + return wd + } + return "." +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..72e0d6e --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module lingma-ipc-proxy + +go 1.25.0 + +require ( + github.com/Microsoft/go-winio v0.6.2 // indirect + golang.org/x/sys v0.10.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a001052 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/httpapi/server.go b/internal/httpapi/server.go new file mode 100644 index 0000000..c0de7d6 --- /dev/null +++ b/internal/httpapi/server.go @@ -0,0 +1,402 @@ +package httpapi + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "lingma-ipc-proxy/internal/service" +) + +type Server struct { + svc *service.Service + http *http.Server + sem chan struct{} +} + +type anthropicRequest struct { + Model string `json:"model"` + MaxTokens int `json:"max_tokens,omitempty"` + System any `json:"system,omitempty"` + Messages []rawMessage `json:"messages"` + Stream bool `json:"stream,omitempty"` +} + +type openAIChatRequest struct { + Model string `json:"model"` + Messages []rawMessage `json:"messages"` + Stream bool `json:"stream,omitempty"` + MaxTokens int `json:"max_tokens,omitempty"` + MaxCompletionTokens int `json:"max_completion_tokens,omitempty"` +} + +type rawMessage struct { + Role string `json:"role"` + Content any `json:"content"` +} + +type modelResponse struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + OwnedBy string `json:"owned_by"` +} + +func NewServer(addr string, svc *service.Service) *Server { + s := &Server{ + svc: svc, + sem: make(chan struct{}, 1), + } + mux := http.NewServeMux() + mux.HandleFunc("/", s.handleRoot) + mux.HandleFunc("/health", s.handleRoot) + mux.HandleFunc("/v1/models", s.handleModels) + mux.HandleFunc("/v1/messages", s.handleAnthropicMessages) + mux.HandleFunc("/v1/chat/completions", s.handleOpenAIChatCompletions) + + s.http = &http.Server{ + Addr: addr, + Handler: withCORS(mux), + ReadHeaderTimeout: 10 * time.Second, + } + return s +} + +func (s *Server) ListenAndServe() error { + return s.http.ListenAndServe() +} + +func (s *Server) Shutdown(ctx context.Context) error { + err := s.http.Shutdown(ctx) + closeErr := s.svc.Close() + if err != nil { + return err + } + return closeErr +} + +func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" && r.URL.Path != "/health" { + writeOpenAIError(w, http.StatusNotFound, "not_found_error", "not found") + return + } + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + if r.Method != http.MethodGet { + writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed") + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "ok": true, + "service": "lingma-ipc-proxy", + "state": s.svc.State(), + }) +} + +func (s *Server) handleModels(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + if r.Method != http.MethodGet { + writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed") + return + } + + models, err := s.svc.ListModels(r.Context()) + if err != nil { + writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error()) + return + } + + data := make([]modelResponse, 0, len(models)) + created := time.Now().Unix() + for _, model := range models { + data = append(data, modelResponse{ + ID: model.ID, + Object: "model", + Created: created, + OwnedBy: "lingma", + }) + } + writeJSON(w, http.StatusOK, map[string]any{ + "object": "list", + "data": data, + }) +} + +func (s *Server) handleAnthropicMessages(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + if r.Method != http.MethodPost { + writeAnthropicError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed") + return + } + if !s.tryAcquire() { + writeAnthropicError(w, http.StatusTooManyRequests, "rate_limit_error", "Lingma IPC proxy handles one request at a time.") + return + } + defer s.release() + + var req anthropicRequest + if err := decodeJSON(r, &req); err != nil { + writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", err.Error()) + return + } + if req.Stream { + writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", "streaming is not supported") + return + } + + normalized, err := normalizeAnthropicRequest(req) + if err != nil { + writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", err.Error()) + return + } + + result, err := s.svc.Generate(r.Context(), normalized) + if err != nil { + writeAnthropicError(w, http.StatusInternalServerError, "api_error", err.Error()) + return + } + + writeJSON(w, http.StatusOK, map[string]any{ + "id": fmt.Sprintf("msg_%d", time.Now().UnixNano()), + "type": "message", + "role": "assistant", + "content": []map[string]any{{"type": "text", "text": result.Text}}, + "model": result.Model, + "stop_reason": "end_turn", + "stop_sequence": nil, + "usage": map[string]any{ + "input_tokens": result.InputTokens, + "output_tokens": result.OutputTokens, + }, + }) +} + +func (s *Server) handleOpenAIChatCompletions(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + if r.Method != http.MethodPost { + writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed") + return + } + if !s.tryAcquire() { + writeOpenAIError(w, http.StatusTooManyRequests, "rate_limit_error", "Lingma IPC proxy handles one request at a time.") + return + } + defer s.release() + + var req openAIChatRequest + if err := decodeJSON(r, &req); err != nil { + writeOpenAIError(w, http.StatusBadRequest, "invalid_request_error", err.Error()) + return + } + if req.Stream { + writeOpenAIError(w, http.StatusBadRequest, "invalid_request_error", "streaming is not supported") + return + } + + normalized, err := normalizeOpenAIRequest(req) + if err != nil { + writeOpenAIError(w, http.StatusBadRequest, "invalid_request_error", err.Error()) + return + } + + result, err := s.svc.Generate(r.Context(), normalized) + if err != nil { + writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error()) + return + } + + created := time.Now().Unix() + writeJSON(w, http.StatusOK, map[string]any{ + "id": fmt.Sprintf("chatcmpl-%d", time.Now().UnixNano()), + "object": "chat.completion", + "created": created, + "model": result.Model, + "choices": []map[string]any{ + { + "index": 0, + "message": map[string]any{ + "role": "assistant", + "content": result.Text, + }, + "finish_reason": "stop", + }, + }, + "usage": map[string]any{ + "prompt_tokens": result.InputTokens, + "completion_tokens": result.OutputTokens, + "total_tokens": result.InputTokens + result.OutputTokens, + }, + }) +} + +func normalizeAnthropicRequest(req anthropicRequest) (service.ChatRequest, error) { + messages := make([]service.ChatMessage, 0, len(req.Messages)) + for _, message := range req.Messages { + role := strings.ToLower(strings.TrimSpace(message.Role)) + text := strings.TrimSpace(extractText(message.Content)) + if role != "user" && role != "assistant" { + continue + } + if text == "" { + continue + } + messages = append(messages, service.ChatMessage{Role: role, Text: text}) + } + if len(messages) == 0 { + return service.ChatRequest{}, fmt.Errorf("no user or assistant messages found") + } + return service.ChatRequest{ + Model: strings.TrimSpace(req.Model), + System: strings.TrimSpace(extractText(req.System)), + Messages: messages, + }, nil +} + +func normalizeOpenAIRequest(req openAIChatRequest) (service.ChatRequest, error) { + messages := make([]service.ChatMessage, 0, len(req.Messages)) + systemParts := make([]string, 0, 2) + for _, message := range req.Messages { + role := strings.ToLower(strings.TrimSpace(message.Role)) + text := strings.TrimSpace(extractText(message.Content)) + if text == "" { + continue + } + switch role { + case "system": + systemParts = append(systemParts, text) + case "user", "assistant": + messages = append(messages, service.ChatMessage{Role: role, Text: text}) + } + } + if len(messages) == 0 { + return service.ChatRequest{}, fmt.Errorf("no user or assistant messages found") + } + return service.ChatRequest{ + Model: strings.TrimSpace(req.Model), + System: strings.Join(systemParts, "\n\n"), + Messages: messages, + }, nil +} + +func extractText(content any) string { + switch typed := content.(type) { + case nil: + return "" + case string: + return strings.TrimSpace(typed) + case []any: + parts := make([]string, 0, len(typed)) + for _, item := range typed { + text := extractText(item) + if text != "" { + parts = append(parts, text) + } + } + return strings.Join(parts, "\n") + case map[string]any: + if text := stringFromAny(typed["text"]); text != "" { + return text + } + if text := stringFromAny(typed["input_text"]); text != "" { + return text + } + if nested := extractText(typed["content"]); nested != "" { + return nested + } + return "" + default: + return "" + } +} + +func stringFromAny(value any) string { + if value == nil { + return "" + } + switch typed := value.(type) { + case string: + return strings.TrimSpace(typed) + default: + return "" + } +} + +func decodeJSON(r *http.Request, out any) error { + defer r.Body.Close() + decoder := json.NewDecoder(r.Body) + decoder.UseNumber() + if err := decoder.Decode(out); err != nil { + return fmt.Errorf("invalid JSON body: %w", err) + } + return nil +} + +func writeJSON(w http.ResponseWriter, status int, data any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(data) +} + +func writeAnthropicError(w http.ResponseWriter, status int, kind string, message string) { + writeJSON(w, status, map[string]any{ + "type": "error", + "error": map[string]any{ + "type": kind, + "message": message, + }, + }) +} + +func writeOpenAIError(w http.ResponseWriter, status int, kind string, message string) { + writeJSON(w, status, map[string]any{ + "error": map[string]any{ + "message": message, + "type": kind, + "code": nil, + "param": nil, + }, + }) +} + +func withCORS(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, x-api-key, anthropic-version") + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + next.ServeHTTP(w, r) + }) +} + +func (s *Server) tryAcquire() bool { + select { + case s.sem <- struct{}{}: + return true + default: + return false + } +} + +func (s *Server) release() { + select { + case <-s.sem: + default: + } +} diff --git a/internal/lingmaipc/client.go b/internal/lingmaipc/client.go new file mode 100644 index 0000000..d9e2361 --- /dev/null +++ b/internal/lingmaipc/client.go @@ -0,0 +1,401 @@ +package lingmaipc + +import ( + "bufio" + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "runtime" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + winio "github.com/Microsoft/go-winio" +) + +const ( + PipeDir = `\\.\pipe\` + PipePrefix = "lingma-" + + MetaRequestID = "ai-coding/request-id" + MetaMode = "ai-coding/mode" + MetaModel = "ai-coding/model" + MetaShellType = "ai-coding/shell-type" + MetaCurrentFilePath = "ai-coding/current-file-path" + MetaEnabledMCP = "ai-coding/enabled-mcp-servers" +) + +type MetaOptions struct { + RequestID string + Mode string + Model string + ShellType string + CurrentFilePath string + EnabledMCP []any +} + +type Notification struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params map[string]any `json:"params,omitempty"` +} + +type rpcError struct { + Code int `json:"code"` + Message string `json:"message"` + Data json.RawMessage `json:"data,omitempty"` +} + +type responseEnvelope struct { + JSONRPC string `json:"jsonrpc"` + ID *int `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params map[string]any `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *rpcError `json:"error,omitempty"` +} + +type Client struct { + conn net.Conn + reader *bufio.Reader + writeMu sync.Mutex + pendingMu sync.Mutex + pending map[int]chan responseEnvelope + subsMu sync.RWMutex + subs map[int]chan Notification + nextID atomic.Int64 + nextSubID atomic.Int64 + closeOnce sync.Once + closed chan struct{} + closeErr atomic.Value +} + +func ResolvePipePath(explicit string) (string, error) { + if runtime.GOOS != "windows" { + return "", errors.New("Lingma IPC proxy currently requires Windows") + } + + if pipe := strings.TrimSpace(explicit); pipe != "" { + return normalizePipePath(pipe), nil + } + if pipe := strings.TrimSpace(os.Getenv("LINGMA_IPC_PIPE")); pipe != "" { + return normalizePipePath(pipe), nil + } + + entries, err := os.ReadDir(PipeDir) + if err != nil { + return "", fmt.Errorf("enumerate Lingma named pipes: %w", err) + } + + names := make([]string, 0, len(entries)) + for _, entry := range entries { + name := entry.Name() + if strings.HasPrefix(name, PipePrefix) { + names = append(names, name) + } + } + sort.Strings(names) + if len(names) == 0 { + return "", errors.New("no active Lingma named pipe was found") + } + return PipeDir + names[len(names)-1], nil +} + +func normalizePipePath(pipe string) string { + if strings.HasPrefix(pipe, PipeDir) { + return pipe + } + return PipeDir + pipe +} + +func DefaultShellType() string { + if shellType := strings.TrimSpace(os.Getenv("LINGMA_PROXY_SHELL_TYPE")); shellType != "" { + return shellType + } + if runtime.GOOS == "windows" { + return "powershell" + } + if shell := strings.TrimSpace(os.Getenv("SHELL")); shell != "" { + parts := strings.FieldsFunc(shell, func(r rune) bool { return r == '/' || r == '\\' }) + if len(parts) > 0 { + return parts[len(parts)-1] + } + } + return "sh" +} + +func CreateRequestID(prefix string) string { + if prefix == "" { + prefix = "ipc" + } + token := make([]byte, 4) + if _, err := rand.Read(token); err != nil { + return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano()) + } + return fmt.Sprintf("%s-%d-%s", prefix, time.Now().UnixMilli(), hex.EncodeToString(token)) +} + +func CreateMeta(opts MetaOptions) map[string]any { + meta := map[string]any{ + MetaRequestID: valueOr(opts.RequestID, CreateRequestID("ipc")), + MetaShellType: valueOr(opts.ShellType, DefaultShellType()), + MetaEnabledMCP: emptySliceIfNil(opts.EnabledMCP), + } + if strings.TrimSpace(opts.Mode) != "" { + meta[MetaMode] = strings.TrimSpace(opts.Mode) + } + if strings.TrimSpace(opts.Model) != "" { + meta[MetaModel] = strings.TrimSpace(opts.Model) + } + if strings.TrimSpace(opts.CurrentFilePath) != "" { + meta[MetaCurrentFilePath] = strings.TrimSpace(opts.CurrentFilePath) + } + return meta +} + +func Connect(ctx context.Context, pipePath string) (*Client, error) { + if runtime.GOOS != "windows" { + return nil, errors.New("Lingma IPC proxy currently requires Windows") + } + + conn, err := winio.DialPipeContext(ctx, pipePath) + if err != nil { + return nil, fmt.Errorf("connect Lingma IPC pipe %s: %w", pipePath, err) + } + + client := &Client{ + conn: conn, + reader: bufio.NewReader(conn), + pending: make(map[int]chan responseEnvelope), + subs: make(map[int]chan Notification), + closed: make(chan struct{}), + } + go client.readLoop() + return client, nil +} + +func (c *Client) Request(ctx context.Context, method string, params any, out any) error { + if params == nil { + params = map[string]any{} + } + + id := int(c.nextID.Add(1)) + payload := map[string]any{ + "jsonrpc": "2.0", + "id": id, + "method": method, + "params": params, + } + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal request %s: %w", method, err) + } + + responseCh := make(chan responseEnvelope, 1) + c.pendingMu.Lock() + c.pending[id] = responseCh + c.pendingMu.Unlock() + + if err := c.writeFrame(body); err != nil { + c.pendingMu.Lock() + delete(c.pending, id) + c.pendingMu.Unlock() + return err + } + + select { + case <-ctx.Done(): + c.pendingMu.Lock() + delete(c.pending, id) + c.pendingMu.Unlock() + return ctx.Err() + case <-c.closed: + return c.closeError() + case resp := <-responseCh: + if resp.Error != nil { + return fmt.Errorf("Lingma IPC %s failed: %s", method, resp.Error.Message) + } + if out == nil || len(resp.Result) == 0 || string(resp.Result) == "null" { + return nil + } + if err := json.Unmarshal(resp.Result, out); err != nil { + return fmt.Errorf("decode %s result: %w", method, err) + } + return nil + } +} + +func (c *Client) Subscribe() (<-chan Notification, func()) { + id := int(c.nextSubID.Add(1)) + ch := make(chan Notification, 2048) + c.subsMu.Lock() + c.subs[id] = ch + c.subsMu.Unlock() + + cancel := func() { + c.subsMu.Lock() + if sub, ok := c.subs[id]; ok { + delete(c.subs, id) + close(sub) + } + c.subsMu.Unlock() + } + return ch, cancel +} + +func (c *Client) Close() error { + c.closeOnce.Do(func() { + close(c.closed) + if err := c.conn.Close(); err != nil { + c.closeErr.Store(err) + } + c.failPending(io.EOF) + c.closeAllSubs() + }) + if v := c.closeErr.Load(); v != nil { + return v.(error) + } + return nil +} + +func (c *Client) writeFrame(body []byte) error { + c.writeMu.Lock() + defer c.writeMu.Unlock() + + frame := []byte(fmt.Sprintf("Content-Length: %d\r\n\r\n", len(body))) + if _, err := c.conn.Write(frame); err != nil { + return fmt.Errorf("write frame header: %w", err) + } + if _, err := c.conn.Write(body); err != nil { + return fmt.Errorf("write frame body: %w", err) + } + return nil +} + +func (c *Client) readLoop() { + defer c.Close() + for { + body, err := c.readFrame() + if err != nil { + if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) { + c.closeErr.Store(err) + } + return + } + + var envelope responseEnvelope + if err := json.Unmarshal(body, &envelope); err != nil { + c.closeErr.Store(fmt.Errorf("decode IPC frame: %w", err)) + return + } + + if envelope.Method != "" && envelope.ID == nil { + c.broadcast(Notification{JSONRPC: envelope.JSONRPC, Method: envelope.Method, Params: envelope.Params}) + continue + } + + if envelope.ID == nil { + continue + } + + c.pendingMu.Lock() + ch, ok := c.pending[*envelope.ID] + if ok { + delete(c.pending, *envelope.ID) + } + c.pendingMu.Unlock() + if ok { + ch <- envelope + close(ch) + } + } +} + +func (c *Client) readFrame() ([]byte, error) { + contentLength := -1 + for { + line, err := c.reader.ReadString('\n') + if err != nil { + return nil, err + } + if line == "\r\n" { + break + } + line = strings.TrimSpace(line) + if strings.HasPrefix(strings.ToLower(line), "content-length:") { + raw := strings.TrimSpace(line[len("content-length:"):]) + n, err := strconv.Atoi(raw) + if err != nil { + return nil, fmt.Errorf("parse content length %q: %w", raw, err) + } + contentLength = n + } + } + if contentLength < 0 { + return nil, errors.New("missing Content-Length header") + } + + body := make([]byte, contentLength) + if _, err := io.ReadFull(c.reader, body); err != nil { + return nil, err + } + return body, nil +} + +func (c *Client) broadcast(notification Notification) { + c.subsMu.RLock() + defer c.subsMu.RUnlock() + for _, ch := range c.subs { + ch <- notification + } +} + +func (c *Client) failPending(err error) { + c.pendingMu.Lock() + defer c.pendingMu.Unlock() + for id, ch := range c.pending { + delete(c.pending, id) + ch <- responseEnvelope{Error: &rpcError{Message: err.Error()}} + close(ch) + } +} + +func (c *Client) closeAllSubs() { + c.subsMu.Lock() + defer c.subsMu.Unlock() + for id, ch := range c.subs { + delete(c.subs, id) + close(ch) + } +} + +func (c *Client) closeError() error { + if v := c.closeErr.Load(); v != nil { + return v.(error) + } + return io.EOF +} + +func valueOr(value string, fallback string) string { + if strings.TrimSpace(value) != "" { + return strings.TrimSpace(value) + } + return fallback +} + +func emptySliceIfNil(v []any) []any { + if v == nil { + return []any{} + } + return v +} diff --git a/internal/service/service.go b/internal/service/service.go new file mode 100644 index 0000000..615a89c --- /dev/null +++ b/internal/service/service.go @@ -0,0 +1,584 @@ +package service + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "sort" + "strings" + "sync" + "time" + + "lingma-ipc-proxy/internal/lingmaipc" +) + +type SessionMode string + +const ( + SessionModeAuto SessionMode = "auto" + SessionModeFresh SessionMode = "fresh" + SessionModeReuse SessionMode = "reuse" +) + +type Config struct { + Host string + Port int + Pipe string + Cwd string + CurrentFilePath string + Mode string + ShellType string + SessionMode SessionMode + Timeout time.Duration +} + +type ChatMessage struct { + Role string + Text string +} + +type ChatRequest struct { + Model string + System string + Messages []ChatMessage +} + +type ChatResult struct { + Text string + Model string + InputTokens int + OutputTokens int + SessionID string + RequestID string + FinishReason string + StopReason string + UsedTokens int + LimitTokens int + PipePath string + EffectiveSession SessionMode +} + +type Model struct { + ID string `json:"id"` + Name string `json:"name"` + Scene string `json:"scene,omitempty"` +} + +type State struct { + PipePath string `json:"pipe_path,omitempty"` + Connected bool `json:"connected"` + StickySessionID string `json:"sticky_session_id,omitempty"` + SessionMode SessionMode `json:"session_mode"` +} + +type Service struct { + cfg Config + mu sync.Mutex + client *lingmaipc.Client + pipePath string + stickySessionID string +} + +type promptRunResult struct { + PromptResult map[string]any + FinishData map[string]any + ContextUsage map[string]any + AssistantText string + TimedOut bool +} + +func New(cfg Config) *Service { + if strings.TrimSpace(cfg.Cwd) == "" { + if wd, err := os.Getwd(); err == nil { + cfg.Cwd = wd + } + } + if strings.TrimSpace(cfg.Mode) == "" { + cfg.Mode = "agent" + } + if strings.TrimSpace(cfg.ShellType) == "" { + cfg.ShellType = lingmaipc.DefaultShellType() + } + if cfg.Timeout <= 0 { + cfg.Timeout = 120 * time.Second + } + if cfg.SessionMode == "" { + cfg.SessionMode = SessionModeAuto + } + return &Service{cfg: cfg} +} + +func (s *Service) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.closeClientLocked() +} + +func (s *Service) State() State { + s.mu.Lock() + defer s.mu.Unlock() + return State{ + PipePath: s.pipePath, + Connected: s.client != nil, + StickySessionID: s.stickySessionID, + SessionMode: s.cfg.SessionMode, + } +} + +func (s *Service) ListModels(ctx context.Context) ([]Model, error) { + s.mu.Lock() + defer s.mu.Unlock() + + ipcClient, err := s.ensureConnectedLocked(ctx) + if err != nil { + return nil, err + } + + var raw any + if err := ipcClient.Request(ctx, "config/queryModels", map[string]any{}, &raw); err != nil { + return nil, err + } + + models := extractModels(raw) + if len(models) == 0 { + models = []Model{{ID: "lingma", Name: "Lingma", Scene: "default"}} + } + return models, nil +} + +func (s *Service) Generate(ctx context.Context, req ChatRequest) (*ChatResult, error) { + s.mu.Lock() + defer s.mu.Unlock() + + requestCtx, cancel := context.WithTimeout(ctx, s.cfg.Timeout) + defer cancel() + + ipcClient, err := s.ensureConnectedLocked(requestCtx) + if err != nil { + return nil, err + } + + effectiveMode := resolveSessionMode(req, s.cfg.SessionMode) + prompt, err := buildLingmaPrompt(req, effectiveMode) + if err != nil { + return nil, err + } + if strings.TrimSpace(prompt) == "" { + return nil, errors.New("empty user message") + } + + sessionID, err := s.resolveSessionLocked(requestCtx, ipcClient, effectiveMode) + if err != nil { + return nil, err + } + + requestID := lingmaipc.CreateRequestID("serve") + meta := lingmaipc.CreateMeta(lingmaipc.MetaOptions{ + RequestID: requestID, + Mode: s.cfg.Mode, + Model: req.Model, + ShellType: s.cfg.ShellType, + CurrentFilePath: s.cfg.CurrentFilePath, + EnabledMCP: []any{}, + }) + + if strings.TrimSpace(req.Model) != "" { + if err := ipcClient.Request(requestCtx, "session/set_model", map[string]any{ + "sessionId": sessionID, + "modelId": strings.TrimSpace(req.Model), + "timestamp": time.Now().UnixMilli(), + "_meta": meta, + }, nil); err != nil { + if effectiveMode == SessionModeReuse { + s.stickySessionID = "" + } + return nil, err + } + } + + runResult, err := s.runPromptLocked(requestCtx, ipcClient, sessionID, prompt, requestID, meta) + if err != nil { + if effectiveMode == SessionModeReuse { + s.stickySessionID = "" + } + return nil, err + } + if runResult.TimedOut || strings.TrimSpace(runResult.AssistantText) == "" { + if effectiveMode == SessionModeReuse { + s.stickySessionID = "" + } + } + if runResult.TimedOut && strings.TrimSpace(runResult.AssistantText) == "" { + return nil, errors.New("timed out while waiting for Lingma IPC to finish responding") + } + if strings.TrimSpace(runResult.AssistantText) == "" { + return nil, errors.New("Lingma IPC did not produce an assistant reply") + } + if runResult.TimedOut { + return nil, fmt.Errorf("Lingma IPC response remained incomplete before timeout. Partial reply: %s", truncate(runResult.AssistantText, 120)) + } + + result := &ChatResult{ + Text: runResult.AssistantText, + Model: valueOr(strings.TrimSpace(req.Model), "lingma"), + InputTokens: estimateTokens(prompt), + OutputTokens: estimateTokens(runResult.AssistantText), + SessionID: sessionID, + RequestID: requestID, + FinishReason: nestedString(runResult.FinishData, "reason"), + StopReason: nestedString(runResult.PromptResult, "stopReason"), + UsedTokens: int(nestedInt64(runResult.ContextUsage, "usedTokens")), + LimitTokens: int(nestedInt64(runResult.ContextUsage, "limitTokens")), + PipePath: s.pipePath, + EffectiveSession: effectiveMode, + } + return result, nil +} + +func (s *Service) ensureConnectedLocked(ctx context.Context) (*lingmaipc.Client, error) { + if s.client != nil { + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + var pong any + if err := s.client.Request(pingCtx, "ping", map[string]any{}, &pong); err == nil { + return s.client, nil + } + _ = s.closeClientLocked() + } + + pipePath, err := lingmaipc.ResolvePipePath(s.cfg.Pipe) + if err != nil { + return nil, err + } + client, err := lingmaipc.Connect(ctx, pipePath) + if err != nil { + return nil, err + } + if err := client.Request(ctx, "initialize", map[string]any{ + "protocolVersion": 1, + "clientCapabilities": map[string]any{}, + "timestamp": time.Now().UnixMilli(), + }, nil); err != nil { + _ = client.Close() + return nil, err + } + + s.client = client + s.pipePath = pipePath + return client, nil +} + +func (s *Service) closeClientLocked() error { + if s.client == nil { + s.pipePath = "" + s.stickySessionID = "" + return nil + } + client := s.client + s.client = nil + s.pipePath = "" + s.stickySessionID = "" + return client.Close() +} + +func (s *Service) resolveSessionLocked(ctx context.Context, client *lingmaipc.Client, mode SessionMode) (string, error) { + if mode == SessionModeReuse && strings.TrimSpace(s.stickySessionID) != "" { + return s.stickySessionID, nil + } + + var created struct { + SessionID string `json:"sessionId"` + ID string `json:"id"` + } + if err := client.Request(ctx, "session/new", map[string]any{ + "cwd": s.cfg.Cwd, + "mcpServers": []any{}, + "_meta": map[string]any{}, + "timestamp": time.Now().UnixMilli(), + }, &created); err != nil { + return "", err + } + + sessionID := strings.TrimSpace(created.SessionID) + if sessionID == "" { + sessionID = strings.TrimSpace(created.ID) + } + if sessionID == "" { + return "", errors.New("Lingma IPC did not return a sessionId") + } + + if mode == SessionModeReuse { + s.stickySessionID = sessionID + } + return sessionID, nil +} + +func (s *Service) runPromptLocked( + ctx context.Context, + client *lingmaipc.Client, + sessionID string, + text string, + requestID string, + meta map[string]any, +) (*promptRunResult, error) { + notifications, cancel := client.Subscribe() + defer cancel() + + promptResult := map[string]any{} + if err := client.Request(ctx, "session/prompt", map[string]any{ + "sessionId": sessionID, + "prompt": []map[string]any{ + {"type": "text", "text": text}, + }, + "_meta": meta, + }, &promptResult); err != nil { + return nil, err + } + + result := &promptRunResult{PromptResult: promptResult} + var builder strings.Builder + + for { + select { + case <-ctx.Done(): + result.AssistantText = builder.String() + result.TimedOut = true + return result, nil + case notification, ok := <-notifications: + if !ok { + result.AssistantText = builder.String() + if result.AssistantText == "" { + return nil, errors.New("Lingma IPC notification stream closed") + } + return result, nil + } + if notification.Method != "session/update" { + continue + } + if nestedStringFromMap(notification.Params, "_meta", lingmaipc.MetaRequestID) != requestID { + continue + } + + update := nestedMap(notification.Params, "update") + switch nestedString(update, "sessionUpdate") { + case "agent_message_chunk": + chunk := nestedString(nestedMap(update, "content"), "text") + if chunk != "" { + builder.WriteString(chunk) + } + case "notification": + switch nestedString(update, "type") { + case "context_usage": + result.ContextUsage = nestedMap(update, "data") + case "chat_finish": + result.FinishData = nestedMap(update, "data") + result.AssistantText = builder.String() + return result, nil + } + } + } + } +} + +func resolveSessionMode(req ChatRequest, configured SessionMode) SessionMode { + if configured != SessionModeAuto { + return configured + } + if strings.TrimSpace(req.System) != "" || len(filteredMessages(req.Messages)) > 1 { + return SessionModeFresh + } + return SessionModeReuse +} + +func buildLingmaPrompt(req ChatRequest, mode SessionMode) (string, error) { + messages := filteredMessages(req.Messages) + var lastUser string + for i := len(messages) - 1; i >= 0; i-- { + if messages[i].Role == "user" { + lastUser = messages[i].Text + break + } + } + if strings.TrimSpace(lastUser) == "" { + return "", errors.New("no user message found in request") + } + if mode == SessionModeReuse { + return lastUser, nil + } + if strings.TrimSpace(req.System) == "" && len(messages) == 1 { + return lastUser, nil + } + + parts := make([]string, 0, len(messages)+4) + if strings.TrimSpace(req.System) != "" { + parts = append(parts, "System instructions:", strings.TrimSpace(req.System)) + } + parts = append(parts, "Conversation transcript:") + for _, message := range messages { + role := "User" + if message.Role == "assistant" { + role = "Assistant" + } + parts = append(parts, fmt.Sprintf("%s: %s", role, message.Text)) + } + parts = append(parts, "Reply as the assistant to the latest user message only. Follow the system instructions and prior transcript naturally.") + return strings.Join(parts, "\n\n"), nil +} + +func filteredMessages(messages []ChatMessage) []ChatMessage { + out := make([]ChatMessage, 0, len(messages)) + for _, message := range messages { + role := strings.ToLower(strings.TrimSpace(message.Role)) + text := strings.TrimSpace(message.Text) + if text == "" { + continue + } + if role != "user" && role != "assistant" { + continue + } + out = append(out, ChatMessage{Role: role, Text: text}) + } + return out +} + +func estimateTokens(text string) int { + text = strings.TrimSpace(text) + if text == "" { + return 1 + } + return max(1, (len([]rune(text))+2)/3) +} + +func extractModels(raw any) []Model { + seen := make(map[string]Model) + var walk func(scene string, value any) + walk = func(scene string, value any) { + switch typed := value.(type) { + case map[string]any: + id := firstString(typed, "id", "modelId", "key") + name := firstString(typed, "name", "label", "displayName", "title") + currentScene := scene + if currentScene == "" { + currentScene = firstString(typed, "scene", "sceneId", "category") + } + if id != "" && (name != "" || likelyModelID(id)) { + if name == "" { + name = id + } + seen[id] = Model{ID: id, Name: name, Scene: currentScene} + } + for key, child := range typed { + nextScene := currentScene + if nextScene == "" || isSceneKey(key) { + nextScene = key + } + walk(nextScene, child) + } + case []any: + for _, item := range typed { + walk(scene, item) + } + } + } + walk("", raw) + + models := make([]Model, 0, len(seen)) + for _, model := range seen { + models = append(models, model) + } + sort.Slice(models, func(i, j int) bool { return models[i].ID < models[j].ID }) + return models +} + +func likelyModelID(id string) bool { + lowered := strings.ToLower(id) + return strings.Contains(lowered, "qwen") || strings.Contains(lowered, "model") || strings.Contains(lowered, "auto") || strings.Contains(lowered, "coder") +} + +func isSceneKey(key string) bool { + switch strings.ToLower(strings.TrimSpace(key)) { + case "assistant", "chat", "developer", "inline", "quest": + return true + default: + return false + } +} + +func firstString(m map[string]any, keys ...string) string { + for _, key := range keys { + if value, ok := m[key]; ok { + switch typed := value.(type) { + case string: + if strings.TrimSpace(typed) != "" { + return strings.TrimSpace(typed) + } + case json.Number: + return typed.String() + } + } + } + return "" +} + +func nestedMap(m map[string]any, key string) map[string]any { + if value, ok := m[key]; ok { + if typed, ok := value.(map[string]any); ok { + return typed + } + } + return map[string]any{} +} + +func nestedString(m map[string]any, key string) string { + if value, ok := m[key]; ok { + switch typed := value.(type) { + case string: + return typed + case json.Number: + return typed.String() + case float64: + return fmt.Sprintf("%.0f", typed) + } + } + return "" +} + +func nestedStringFromMap(m map[string]any, parent string, key string) string { + child := nestedMap(m, parent) + return nestedString(child, key) +} + +func nestedInt64(m map[string]any, key string) int64 { + if value, ok := m[key]; ok { + switch typed := value.(type) { + case int: + return int64(typed) + case int64: + return typed + case float64: + return int64(typed) + case json.Number: + if n, err := typed.Int64(); err == nil { + return n + } + } + } + return 0 +} + +func truncate(text string, limit int) string { + runes := []rune(text) + if len(runes) <= limit { + return text + } + return string(runes[:limit]) +} + +func valueOr(value string, fallback string) string { + if strings.TrimSpace(value) != "" { + return strings.TrimSpace(value) + } + return fallback +}