feat: add Lingma IPC proxy service

This commit is contained in:
coolxll
2026-03-25 21:35:19 +08:00
commit 585c3ba5ab
8 changed files with 1614 additions and 0 deletions

402
internal/httpapi/server.go Normal file
View File

@@ -0,0 +1,402 @@
package httpapi
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"lingma-ipc-proxy/internal/service"
)
type Server struct {
svc *service.Service
http *http.Server
sem chan struct{}
}
type anthropicRequest struct {
Model string `json:"model"`
MaxTokens int `json:"max_tokens,omitempty"`
System any `json:"system,omitempty"`
Messages []rawMessage `json:"messages"`
Stream bool `json:"stream,omitempty"`
}
type openAIChatRequest struct {
Model string `json:"model"`
Messages []rawMessage `json:"messages"`
Stream bool `json:"stream,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
MaxCompletionTokens int `json:"max_completion_tokens,omitempty"`
}
type rawMessage struct {
Role string `json:"role"`
Content any `json:"content"`
}
type modelResponse struct {
ID string `json:"id"`
Object string `json:"object"`
Created int64 `json:"created"`
OwnedBy string `json:"owned_by"`
}
func NewServer(addr string, svc *service.Service) *Server {
s := &Server{
svc: svc,
sem: make(chan struct{}, 1),
}
mux := http.NewServeMux()
mux.HandleFunc("/", s.handleRoot)
mux.HandleFunc("/health", s.handleRoot)
mux.HandleFunc("/v1/models", s.handleModels)
mux.HandleFunc("/v1/messages", s.handleAnthropicMessages)
mux.HandleFunc("/v1/chat/completions", s.handleOpenAIChatCompletions)
s.http = &http.Server{
Addr: addr,
Handler: withCORS(mux),
ReadHeaderTimeout: 10 * time.Second,
}
return s
}
func (s *Server) ListenAndServe() error {
return s.http.ListenAndServe()
}
func (s *Server) Shutdown(ctx context.Context) error {
err := s.http.Shutdown(ctx)
closeErr := s.svc.Close()
if err != nil {
return err
}
return closeErr
}
func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" && r.URL.Path != "/health" {
writeOpenAIError(w, http.StatusNotFound, "not_found_error", "not found")
return
}
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"ok": true,
"service": "lingma-ipc-proxy",
"state": s.svc.State(),
})
}
func (s *Server) handleModels(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodGet {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
models, err := s.svc.ListModels(r.Context())
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
data := make([]modelResponse, 0, len(models))
created := time.Now().Unix()
for _, model := range models {
data = append(data, modelResponse{
ID: model.ID,
Object: "model",
Created: created,
OwnedBy: "lingma",
})
}
writeJSON(w, http.StatusOK, map[string]any{
"object": "list",
"data": data,
})
}
func (s *Server) handleAnthropicMessages(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodPost {
writeAnthropicError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
if !s.tryAcquire() {
writeAnthropicError(w, http.StatusTooManyRequests, "rate_limit_error", "Lingma IPC proxy handles one request at a time.")
return
}
defer s.release()
var req anthropicRequest
if err := decodeJSON(r, &req); err != nil {
writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
return
}
if req.Stream {
writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", "streaming is not supported")
return
}
normalized, err := normalizeAnthropicRequest(req)
if err != nil {
writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
return
}
result, err := s.svc.Generate(r.Context(), normalized)
if err != nil {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"id": fmt.Sprintf("msg_%d", time.Now().UnixNano()),
"type": "message",
"role": "assistant",
"content": []map[string]any{{"type": "text", "text": result.Text}},
"model": result.Model,
"stop_reason": "end_turn",
"stop_sequence": nil,
"usage": map[string]any{
"input_tokens": result.InputTokens,
"output_tokens": result.OutputTokens,
},
})
}
func (s *Server) handleOpenAIChatCompletions(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
if r.Method != http.MethodPost {
writeOpenAIError(w, http.StatusMethodNotAllowed, "invalid_request_error", "method not allowed")
return
}
if !s.tryAcquire() {
writeOpenAIError(w, http.StatusTooManyRequests, "rate_limit_error", "Lingma IPC proxy handles one request at a time.")
return
}
defer s.release()
var req openAIChatRequest
if err := decodeJSON(r, &req); err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
return
}
if req.Stream {
writeOpenAIError(w, http.StatusBadRequest, "invalid_request_error", "streaming is not supported")
return
}
normalized, err := normalizeOpenAIRequest(req)
if err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid_request_error", err.Error())
return
}
result, err := s.svc.Generate(r.Context(), normalized)
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "api_error", err.Error())
return
}
created := time.Now().Unix()
writeJSON(w, http.StatusOK, map[string]any{
"id": fmt.Sprintf("chatcmpl-%d", time.Now().UnixNano()),
"object": "chat.completion",
"created": created,
"model": result.Model,
"choices": []map[string]any{
{
"index": 0,
"message": map[string]any{
"role": "assistant",
"content": result.Text,
},
"finish_reason": "stop",
},
},
"usage": map[string]any{
"prompt_tokens": result.InputTokens,
"completion_tokens": result.OutputTokens,
"total_tokens": result.InputTokens + result.OutputTokens,
},
})
}
func normalizeAnthropicRequest(req anthropicRequest) (service.ChatRequest, error) {
messages := make([]service.ChatMessage, 0, len(req.Messages))
for _, message := range req.Messages {
role := strings.ToLower(strings.TrimSpace(message.Role))
text := strings.TrimSpace(extractText(message.Content))
if role != "user" && role != "assistant" {
continue
}
if text == "" {
continue
}
messages = append(messages, service.ChatMessage{Role: role, Text: text})
}
if len(messages) == 0 {
return service.ChatRequest{}, fmt.Errorf("no user or assistant messages found")
}
return service.ChatRequest{
Model: strings.TrimSpace(req.Model),
System: strings.TrimSpace(extractText(req.System)),
Messages: messages,
}, nil
}
func normalizeOpenAIRequest(req openAIChatRequest) (service.ChatRequest, error) {
messages := make([]service.ChatMessage, 0, len(req.Messages))
systemParts := make([]string, 0, 2)
for _, message := range req.Messages {
role := strings.ToLower(strings.TrimSpace(message.Role))
text := strings.TrimSpace(extractText(message.Content))
if text == "" {
continue
}
switch role {
case "system":
systemParts = append(systemParts, text)
case "user", "assistant":
messages = append(messages, service.ChatMessage{Role: role, Text: text})
}
}
if len(messages) == 0 {
return service.ChatRequest{}, fmt.Errorf("no user or assistant messages found")
}
return service.ChatRequest{
Model: strings.TrimSpace(req.Model),
System: strings.Join(systemParts, "\n\n"),
Messages: messages,
}, nil
}
func extractText(content any) string {
switch typed := content.(type) {
case nil:
return ""
case string:
return strings.TrimSpace(typed)
case []any:
parts := make([]string, 0, len(typed))
for _, item := range typed {
text := extractText(item)
if text != "" {
parts = append(parts, text)
}
}
return strings.Join(parts, "\n")
case map[string]any:
if text := stringFromAny(typed["text"]); text != "" {
return text
}
if text := stringFromAny(typed["input_text"]); text != "" {
return text
}
if nested := extractText(typed["content"]); nested != "" {
return nested
}
return ""
default:
return ""
}
}
func stringFromAny(value any) string {
if value == nil {
return ""
}
switch typed := value.(type) {
case string:
return strings.TrimSpace(typed)
default:
return ""
}
}
func decodeJSON(r *http.Request, out any) error {
defer r.Body.Close()
decoder := json.NewDecoder(r.Body)
decoder.UseNumber()
if err := decoder.Decode(out); err != nil {
return fmt.Errorf("invalid JSON body: %w", err)
}
return nil
}
func writeJSON(w http.ResponseWriter, status int, data any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(data)
}
func writeAnthropicError(w http.ResponseWriter, status int, kind string, message string) {
writeJSON(w, status, map[string]any{
"type": "error",
"error": map[string]any{
"type": kind,
"message": message,
},
})
}
func writeOpenAIError(w http.ResponseWriter, status int, kind string, message string) {
writeJSON(w, status, map[string]any{
"error": map[string]any{
"message": message,
"type": kind,
"code": nil,
"param": nil,
},
})
}
func withCORS(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, x-api-key, anthropic-version")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
func (s *Server) tryAcquire() bool {
select {
case s.sem <- struct{}{}:
return true
default:
return false
}
}
func (s *Server) release() {
select {
case <-s.sem:
default:
}
}

View File

@@ -0,0 +1,401 @@
package lingmaipc
import (
"bufio"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
winio "github.com/Microsoft/go-winio"
)
const (
PipeDir = `\\.\pipe\`
PipePrefix = "lingma-"
MetaRequestID = "ai-coding/request-id"
MetaMode = "ai-coding/mode"
MetaModel = "ai-coding/model"
MetaShellType = "ai-coding/shell-type"
MetaCurrentFilePath = "ai-coding/current-file-path"
MetaEnabledMCP = "ai-coding/enabled-mcp-servers"
)
type MetaOptions struct {
RequestID string
Mode string
Model string
ShellType string
CurrentFilePath string
EnabledMCP []any
}
type Notification struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params map[string]any `json:"params,omitempty"`
}
type rpcError struct {
Code int `json:"code"`
Message string `json:"message"`
Data json.RawMessage `json:"data,omitempty"`
}
type responseEnvelope struct {
JSONRPC string `json:"jsonrpc"`
ID *int `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params map[string]any `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *rpcError `json:"error,omitempty"`
}
type Client struct {
conn net.Conn
reader *bufio.Reader
writeMu sync.Mutex
pendingMu sync.Mutex
pending map[int]chan responseEnvelope
subsMu sync.RWMutex
subs map[int]chan Notification
nextID atomic.Int64
nextSubID atomic.Int64
closeOnce sync.Once
closed chan struct{}
closeErr atomic.Value
}
func ResolvePipePath(explicit string) (string, error) {
if runtime.GOOS != "windows" {
return "", errors.New("Lingma IPC proxy currently requires Windows")
}
if pipe := strings.TrimSpace(explicit); pipe != "" {
return normalizePipePath(pipe), nil
}
if pipe := strings.TrimSpace(os.Getenv("LINGMA_IPC_PIPE")); pipe != "" {
return normalizePipePath(pipe), nil
}
entries, err := os.ReadDir(PipeDir)
if err != nil {
return "", fmt.Errorf("enumerate Lingma named pipes: %w", err)
}
names := make([]string, 0, len(entries))
for _, entry := range entries {
name := entry.Name()
if strings.HasPrefix(name, PipePrefix) {
names = append(names, name)
}
}
sort.Strings(names)
if len(names) == 0 {
return "", errors.New("no active Lingma named pipe was found")
}
return PipeDir + names[len(names)-1], nil
}
func normalizePipePath(pipe string) string {
if strings.HasPrefix(pipe, PipeDir) {
return pipe
}
return PipeDir + pipe
}
func DefaultShellType() string {
if shellType := strings.TrimSpace(os.Getenv("LINGMA_PROXY_SHELL_TYPE")); shellType != "" {
return shellType
}
if runtime.GOOS == "windows" {
return "powershell"
}
if shell := strings.TrimSpace(os.Getenv("SHELL")); shell != "" {
parts := strings.FieldsFunc(shell, func(r rune) bool { return r == '/' || r == '\\' })
if len(parts) > 0 {
return parts[len(parts)-1]
}
}
return "sh"
}
func CreateRequestID(prefix string) string {
if prefix == "" {
prefix = "ipc"
}
token := make([]byte, 4)
if _, err := rand.Read(token); err != nil {
return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
}
return fmt.Sprintf("%s-%d-%s", prefix, time.Now().UnixMilli(), hex.EncodeToString(token))
}
func CreateMeta(opts MetaOptions) map[string]any {
meta := map[string]any{
MetaRequestID: valueOr(opts.RequestID, CreateRequestID("ipc")),
MetaShellType: valueOr(opts.ShellType, DefaultShellType()),
MetaEnabledMCP: emptySliceIfNil(opts.EnabledMCP),
}
if strings.TrimSpace(opts.Mode) != "" {
meta[MetaMode] = strings.TrimSpace(opts.Mode)
}
if strings.TrimSpace(opts.Model) != "" {
meta[MetaModel] = strings.TrimSpace(opts.Model)
}
if strings.TrimSpace(opts.CurrentFilePath) != "" {
meta[MetaCurrentFilePath] = strings.TrimSpace(opts.CurrentFilePath)
}
return meta
}
func Connect(ctx context.Context, pipePath string) (*Client, error) {
if runtime.GOOS != "windows" {
return nil, errors.New("Lingma IPC proxy currently requires Windows")
}
conn, err := winio.DialPipeContext(ctx, pipePath)
if err != nil {
return nil, fmt.Errorf("connect Lingma IPC pipe %s: %w", pipePath, err)
}
client := &Client{
conn: conn,
reader: bufio.NewReader(conn),
pending: make(map[int]chan responseEnvelope),
subs: make(map[int]chan Notification),
closed: make(chan struct{}),
}
go client.readLoop()
return client, nil
}
func (c *Client) Request(ctx context.Context, method string, params any, out any) error {
if params == nil {
params = map[string]any{}
}
id := int(c.nextID.Add(1))
payload := map[string]any{
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
}
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal request %s: %w", method, err)
}
responseCh := make(chan responseEnvelope, 1)
c.pendingMu.Lock()
c.pending[id] = responseCh
c.pendingMu.Unlock()
if err := c.writeFrame(body); err != nil {
c.pendingMu.Lock()
delete(c.pending, id)
c.pendingMu.Unlock()
return err
}
select {
case <-ctx.Done():
c.pendingMu.Lock()
delete(c.pending, id)
c.pendingMu.Unlock()
return ctx.Err()
case <-c.closed:
return c.closeError()
case resp := <-responseCh:
if resp.Error != nil {
return fmt.Errorf("Lingma IPC %s failed: %s", method, resp.Error.Message)
}
if out == nil || len(resp.Result) == 0 || string(resp.Result) == "null" {
return nil
}
if err := json.Unmarshal(resp.Result, out); err != nil {
return fmt.Errorf("decode %s result: %w", method, err)
}
return nil
}
}
func (c *Client) Subscribe() (<-chan Notification, func()) {
id := int(c.nextSubID.Add(1))
ch := make(chan Notification, 2048)
c.subsMu.Lock()
c.subs[id] = ch
c.subsMu.Unlock()
cancel := func() {
c.subsMu.Lock()
if sub, ok := c.subs[id]; ok {
delete(c.subs, id)
close(sub)
}
c.subsMu.Unlock()
}
return ch, cancel
}
func (c *Client) Close() error {
c.closeOnce.Do(func() {
close(c.closed)
if err := c.conn.Close(); err != nil {
c.closeErr.Store(err)
}
c.failPending(io.EOF)
c.closeAllSubs()
})
if v := c.closeErr.Load(); v != nil {
return v.(error)
}
return nil
}
func (c *Client) writeFrame(body []byte) error {
c.writeMu.Lock()
defer c.writeMu.Unlock()
frame := []byte(fmt.Sprintf("Content-Length: %d\r\n\r\n", len(body)))
if _, err := c.conn.Write(frame); err != nil {
return fmt.Errorf("write frame header: %w", err)
}
if _, err := c.conn.Write(body); err != nil {
return fmt.Errorf("write frame body: %w", err)
}
return nil
}
func (c *Client) readLoop() {
defer c.Close()
for {
body, err := c.readFrame()
if err != nil {
if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) {
c.closeErr.Store(err)
}
return
}
var envelope responseEnvelope
if err := json.Unmarshal(body, &envelope); err != nil {
c.closeErr.Store(fmt.Errorf("decode IPC frame: %w", err))
return
}
if envelope.Method != "" && envelope.ID == nil {
c.broadcast(Notification{JSONRPC: envelope.JSONRPC, Method: envelope.Method, Params: envelope.Params})
continue
}
if envelope.ID == nil {
continue
}
c.pendingMu.Lock()
ch, ok := c.pending[*envelope.ID]
if ok {
delete(c.pending, *envelope.ID)
}
c.pendingMu.Unlock()
if ok {
ch <- envelope
close(ch)
}
}
}
func (c *Client) readFrame() ([]byte, error) {
contentLength := -1
for {
line, err := c.reader.ReadString('\n')
if err != nil {
return nil, err
}
if line == "\r\n" {
break
}
line = strings.TrimSpace(line)
if strings.HasPrefix(strings.ToLower(line), "content-length:") {
raw := strings.TrimSpace(line[len("content-length:"):])
n, err := strconv.Atoi(raw)
if err != nil {
return nil, fmt.Errorf("parse content length %q: %w", raw, err)
}
contentLength = n
}
}
if contentLength < 0 {
return nil, errors.New("missing Content-Length header")
}
body := make([]byte, contentLength)
if _, err := io.ReadFull(c.reader, body); err != nil {
return nil, err
}
return body, nil
}
func (c *Client) broadcast(notification Notification) {
c.subsMu.RLock()
defer c.subsMu.RUnlock()
for _, ch := range c.subs {
ch <- notification
}
}
func (c *Client) failPending(err error) {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
for id, ch := range c.pending {
delete(c.pending, id)
ch <- responseEnvelope{Error: &rpcError{Message: err.Error()}}
close(ch)
}
}
func (c *Client) closeAllSubs() {
c.subsMu.Lock()
defer c.subsMu.Unlock()
for id, ch := range c.subs {
delete(c.subs, id)
close(ch)
}
}
func (c *Client) closeError() error {
if v := c.closeErr.Load(); v != nil {
return v.(error)
}
return io.EOF
}
func valueOr(value string, fallback string) string {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
return fallback
}
func emptySliceIfNil(v []any) []any {
if v == nil {
return []any{}
}
return v
}

584
internal/service/service.go Normal file
View File

@@ -0,0 +1,584 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"sort"
"strings"
"sync"
"time"
"lingma-ipc-proxy/internal/lingmaipc"
)
type SessionMode string
const (
SessionModeAuto SessionMode = "auto"
SessionModeFresh SessionMode = "fresh"
SessionModeReuse SessionMode = "reuse"
)
type Config struct {
Host string
Port int
Pipe string
Cwd string
CurrentFilePath string
Mode string
ShellType string
SessionMode SessionMode
Timeout time.Duration
}
type ChatMessage struct {
Role string
Text string
}
type ChatRequest struct {
Model string
System string
Messages []ChatMessage
}
type ChatResult struct {
Text string
Model string
InputTokens int
OutputTokens int
SessionID string
RequestID string
FinishReason string
StopReason string
UsedTokens int
LimitTokens int
PipePath string
EffectiveSession SessionMode
}
type Model struct {
ID string `json:"id"`
Name string `json:"name"`
Scene string `json:"scene,omitempty"`
}
type State struct {
PipePath string `json:"pipe_path,omitempty"`
Connected bool `json:"connected"`
StickySessionID string `json:"sticky_session_id,omitempty"`
SessionMode SessionMode `json:"session_mode"`
}
type Service struct {
cfg Config
mu sync.Mutex
client *lingmaipc.Client
pipePath string
stickySessionID string
}
type promptRunResult struct {
PromptResult map[string]any
FinishData map[string]any
ContextUsage map[string]any
AssistantText string
TimedOut bool
}
func New(cfg Config) *Service {
if strings.TrimSpace(cfg.Cwd) == "" {
if wd, err := os.Getwd(); err == nil {
cfg.Cwd = wd
}
}
if strings.TrimSpace(cfg.Mode) == "" {
cfg.Mode = "agent"
}
if strings.TrimSpace(cfg.ShellType) == "" {
cfg.ShellType = lingmaipc.DefaultShellType()
}
if cfg.Timeout <= 0 {
cfg.Timeout = 120 * time.Second
}
if cfg.SessionMode == "" {
cfg.SessionMode = SessionModeAuto
}
return &Service{cfg: cfg}
}
func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.closeClientLocked()
}
func (s *Service) State() State {
s.mu.Lock()
defer s.mu.Unlock()
return State{
PipePath: s.pipePath,
Connected: s.client != nil,
StickySessionID: s.stickySessionID,
SessionMode: s.cfg.SessionMode,
}
}
func (s *Service) ListModels(ctx context.Context) ([]Model, error) {
s.mu.Lock()
defer s.mu.Unlock()
ipcClient, err := s.ensureConnectedLocked(ctx)
if err != nil {
return nil, err
}
var raw any
if err := ipcClient.Request(ctx, "config/queryModels", map[string]any{}, &raw); err != nil {
return nil, err
}
models := extractModels(raw)
if len(models) == 0 {
models = []Model{{ID: "lingma", Name: "Lingma", Scene: "default"}}
}
return models, nil
}
func (s *Service) Generate(ctx context.Context, req ChatRequest) (*ChatResult, error) {
s.mu.Lock()
defer s.mu.Unlock()
requestCtx, cancel := context.WithTimeout(ctx, s.cfg.Timeout)
defer cancel()
ipcClient, err := s.ensureConnectedLocked(requestCtx)
if err != nil {
return nil, err
}
effectiveMode := resolveSessionMode(req, s.cfg.SessionMode)
prompt, err := buildLingmaPrompt(req, effectiveMode)
if err != nil {
return nil, err
}
if strings.TrimSpace(prompt) == "" {
return nil, errors.New("empty user message")
}
sessionID, err := s.resolveSessionLocked(requestCtx, ipcClient, effectiveMode)
if err != nil {
return nil, err
}
requestID := lingmaipc.CreateRequestID("serve")
meta := lingmaipc.CreateMeta(lingmaipc.MetaOptions{
RequestID: requestID,
Mode: s.cfg.Mode,
Model: req.Model,
ShellType: s.cfg.ShellType,
CurrentFilePath: s.cfg.CurrentFilePath,
EnabledMCP: []any{},
})
if strings.TrimSpace(req.Model) != "" {
if err := ipcClient.Request(requestCtx, "session/set_model", map[string]any{
"sessionId": sessionID,
"modelId": strings.TrimSpace(req.Model),
"timestamp": time.Now().UnixMilli(),
"_meta": meta,
}, nil); err != nil {
if effectiveMode == SessionModeReuse {
s.stickySessionID = ""
}
return nil, err
}
}
runResult, err := s.runPromptLocked(requestCtx, ipcClient, sessionID, prompt, requestID, meta)
if err != nil {
if effectiveMode == SessionModeReuse {
s.stickySessionID = ""
}
return nil, err
}
if runResult.TimedOut || strings.TrimSpace(runResult.AssistantText) == "" {
if effectiveMode == SessionModeReuse {
s.stickySessionID = ""
}
}
if runResult.TimedOut && strings.TrimSpace(runResult.AssistantText) == "" {
return nil, errors.New("timed out while waiting for Lingma IPC to finish responding")
}
if strings.TrimSpace(runResult.AssistantText) == "" {
return nil, errors.New("Lingma IPC did not produce an assistant reply")
}
if runResult.TimedOut {
return nil, fmt.Errorf("Lingma IPC response remained incomplete before timeout. Partial reply: %s", truncate(runResult.AssistantText, 120))
}
result := &ChatResult{
Text: runResult.AssistantText,
Model: valueOr(strings.TrimSpace(req.Model), "lingma"),
InputTokens: estimateTokens(prompt),
OutputTokens: estimateTokens(runResult.AssistantText),
SessionID: sessionID,
RequestID: requestID,
FinishReason: nestedString(runResult.FinishData, "reason"),
StopReason: nestedString(runResult.PromptResult, "stopReason"),
UsedTokens: int(nestedInt64(runResult.ContextUsage, "usedTokens")),
LimitTokens: int(nestedInt64(runResult.ContextUsage, "limitTokens")),
PipePath: s.pipePath,
EffectiveSession: effectiveMode,
}
return result, nil
}
func (s *Service) ensureConnectedLocked(ctx context.Context) (*lingmaipc.Client, error) {
if s.client != nil {
pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var pong any
if err := s.client.Request(pingCtx, "ping", map[string]any{}, &pong); err == nil {
return s.client, nil
}
_ = s.closeClientLocked()
}
pipePath, err := lingmaipc.ResolvePipePath(s.cfg.Pipe)
if err != nil {
return nil, err
}
client, err := lingmaipc.Connect(ctx, pipePath)
if err != nil {
return nil, err
}
if err := client.Request(ctx, "initialize", map[string]any{
"protocolVersion": 1,
"clientCapabilities": map[string]any{},
"timestamp": time.Now().UnixMilli(),
}, nil); err != nil {
_ = client.Close()
return nil, err
}
s.client = client
s.pipePath = pipePath
return client, nil
}
func (s *Service) closeClientLocked() error {
if s.client == nil {
s.pipePath = ""
s.stickySessionID = ""
return nil
}
client := s.client
s.client = nil
s.pipePath = ""
s.stickySessionID = ""
return client.Close()
}
func (s *Service) resolveSessionLocked(ctx context.Context, client *lingmaipc.Client, mode SessionMode) (string, error) {
if mode == SessionModeReuse && strings.TrimSpace(s.stickySessionID) != "" {
return s.stickySessionID, nil
}
var created struct {
SessionID string `json:"sessionId"`
ID string `json:"id"`
}
if err := client.Request(ctx, "session/new", map[string]any{
"cwd": s.cfg.Cwd,
"mcpServers": []any{},
"_meta": map[string]any{},
"timestamp": time.Now().UnixMilli(),
}, &created); err != nil {
return "", err
}
sessionID := strings.TrimSpace(created.SessionID)
if sessionID == "" {
sessionID = strings.TrimSpace(created.ID)
}
if sessionID == "" {
return "", errors.New("Lingma IPC did not return a sessionId")
}
if mode == SessionModeReuse {
s.stickySessionID = sessionID
}
return sessionID, nil
}
func (s *Service) runPromptLocked(
ctx context.Context,
client *lingmaipc.Client,
sessionID string,
text string,
requestID string,
meta map[string]any,
) (*promptRunResult, error) {
notifications, cancel := client.Subscribe()
defer cancel()
promptResult := map[string]any{}
if err := client.Request(ctx, "session/prompt", map[string]any{
"sessionId": sessionID,
"prompt": []map[string]any{
{"type": "text", "text": text},
},
"_meta": meta,
}, &promptResult); err != nil {
return nil, err
}
result := &promptRunResult{PromptResult: promptResult}
var builder strings.Builder
for {
select {
case <-ctx.Done():
result.AssistantText = builder.String()
result.TimedOut = true
return result, nil
case notification, ok := <-notifications:
if !ok {
result.AssistantText = builder.String()
if result.AssistantText == "" {
return nil, errors.New("Lingma IPC notification stream closed")
}
return result, nil
}
if notification.Method != "session/update" {
continue
}
if nestedStringFromMap(notification.Params, "_meta", lingmaipc.MetaRequestID) != requestID {
continue
}
update := nestedMap(notification.Params, "update")
switch nestedString(update, "sessionUpdate") {
case "agent_message_chunk":
chunk := nestedString(nestedMap(update, "content"), "text")
if chunk != "" {
builder.WriteString(chunk)
}
case "notification":
switch nestedString(update, "type") {
case "context_usage":
result.ContextUsage = nestedMap(update, "data")
case "chat_finish":
result.FinishData = nestedMap(update, "data")
result.AssistantText = builder.String()
return result, nil
}
}
}
}
}
func resolveSessionMode(req ChatRequest, configured SessionMode) SessionMode {
if configured != SessionModeAuto {
return configured
}
if strings.TrimSpace(req.System) != "" || len(filteredMessages(req.Messages)) > 1 {
return SessionModeFresh
}
return SessionModeReuse
}
func buildLingmaPrompt(req ChatRequest, mode SessionMode) (string, error) {
messages := filteredMessages(req.Messages)
var lastUser string
for i := len(messages) - 1; i >= 0; i-- {
if messages[i].Role == "user" {
lastUser = messages[i].Text
break
}
}
if strings.TrimSpace(lastUser) == "" {
return "", errors.New("no user message found in request")
}
if mode == SessionModeReuse {
return lastUser, nil
}
if strings.TrimSpace(req.System) == "" && len(messages) == 1 {
return lastUser, nil
}
parts := make([]string, 0, len(messages)+4)
if strings.TrimSpace(req.System) != "" {
parts = append(parts, "System instructions:", strings.TrimSpace(req.System))
}
parts = append(parts, "Conversation transcript:")
for _, message := range messages {
role := "User"
if message.Role == "assistant" {
role = "Assistant"
}
parts = append(parts, fmt.Sprintf("%s: %s", role, message.Text))
}
parts = append(parts, "Reply as the assistant to the latest user message only. Follow the system instructions and prior transcript naturally.")
return strings.Join(parts, "\n\n"), nil
}
func filteredMessages(messages []ChatMessage) []ChatMessage {
out := make([]ChatMessage, 0, len(messages))
for _, message := range messages {
role := strings.ToLower(strings.TrimSpace(message.Role))
text := strings.TrimSpace(message.Text)
if text == "" {
continue
}
if role != "user" && role != "assistant" {
continue
}
out = append(out, ChatMessage{Role: role, Text: text})
}
return out
}
func estimateTokens(text string) int {
text = strings.TrimSpace(text)
if text == "" {
return 1
}
return max(1, (len([]rune(text))+2)/3)
}
func extractModels(raw any) []Model {
seen := make(map[string]Model)
var walk func(scene string, value any)
walk = func(scene string, value any) {
switch typed := value.(type) {
case map[string]any:
id := firstString(typed, "id", "modelId", "key")
name := firstString(typed, "name", "label", "displayName", "title")
currentScene := scene
if currentScene == "" {
currentScene = firstString(typed, "scene", "sceneId", "category")
}
if id != "" && (name != "" || likelyModelID(id)) {
if name == "" {
name = id
}
seen[id] = Model{ID: id, Name: name, Scene: currentScene}
}
for key, child := range typed {
nextScene := currentScene
if nextScene == "" || isSceneKey(key) {
nextScene = key
}
walk(nextScene, child)
}
case []any:
for _, item := range typed {
walk(scene, item)
}
}
}
walk("", raw)
models := make([]Model, 0, len(seen))
for _, model := range seen {
models = append(models, model)
}
sort.Slice(models, func(i, j int) bool { return models[i].ID < models[j].ID })
return models
}
func likelyModelID(id string) bool {
lowered := strings.ToLower(id)
return strings.Contains(lowered, "qwen") || strings.Contains(lowered, "model") || strings.Contains(lowered, "auto") || strings.Contains(lowered, "coder")
}
func isSceneKey(key string) bool {
switch strings.ToLower(strings.TrimSpace(key)) {
case "assistant", "chat", "developer", "inline", "quest":
return true
default:
return false
}
}
func firstString(m map[string]any, keys ...string) string {
for _, key := range keys {
if value, ok := m[key]; ok {
switch typed := value.(type) {
case string:
if strings.TrimSpace(typed) != "" {
return strings.TrimSpace(typed)
}
case json.Number:
return typed.String()
}
}
}
return ""
}
func nestedMap(m map[string]any, key string) map[string]any {
if value, ok := m[key]; ok {
if typed, ok := value.(map[string]any); ok {
return typed
}
}
return map[string]any{}
}
func nestedString(m map[string]any, key string) string {
if value, ok := m[key]; ok {
switch typed := value.(type) {
case string:
return typed
case json.Number:
return typed.String()
case float64:
return fmt.Sprintf("%.0f", typed)
}
}
return ""
}
func nestedStringFromMap(m map[string]any, parent string, key string) string {
child := nestedMap(m, parent)
return nestedString(child, key)
}
func nestedInt64(m map[string]any, key string) int64 {
if value, ok := m[key]; ok {
switch typed := value.(type) {
case int:
return int64(typed)
case int64:
return typed
case float64:
return int64(typed)
case json.Number:
if n, err := typed.Int64(); err == nil {
return n
}
}
}
return 0
}
func truncate(text string, limit int) string {
runes := []rune(text)
if len(runes) <= limit {
return text
}
return string(runes[:limit])
}
func valueOr(value string, fallback string) string {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
return fallback
}