Files
lingma-proxy-compose/internal/lingmaipc/client.go
2026-03-26 09:37:00 +08:00

354 lines
7.5 KiB
Go

package lingmaipc
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
PipeDir = `\\.\pipe\`
PipePrefix = "lingma-"
MetaRequestID = "ai-coding/request-id"
MetaMode = "ai-coding/mode"
MetaModel = "ai-coding/model"
MetaShellType = "ai-coding/shell-type"
MetaCurrentFilePath = "ai-coding/current-file-path"
MetaEnabledMCP = "ai-coding/enabled-mcp-servers"
)
type MetaOptions struct {
RequestID string
Mode string
Model string
ShellType string
CurrentFilePath string
EnabledMCP []any
}
type Notification struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params map[string]any `json:"params,omitempty"`
}
type rpcError struct {
Code int `json:"code"`
Message string `json:"message"`
Data json.RawMessage `json:"data,omitempty"`
}
type responseEnvelope struct {
JSONRPC string `json:"jsonrpc"`
ID *int `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params map[string]any `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *rpcError `json:"error,omitempty"`
}
type Client struct {
transport framedTransport
kind Transport
pendingMu sync.Mutex
pending map[int]chan responseEnvelope
subsMu sync.RWMutex
subs map[int]chan Notification
nextID atomic.Int64
nextSubID atomic.Int64
closeOnce sync.Once
closed chan struct{}
closeErr atomic.Value
responseMu sync.Mutex
}
func DefaultShellType() string {
if shellType := strings.TrimSpace(os.Getenv("LINGMA_PROXY_SHELL_TYPE")); shellType != "" {
return shellType
}
if runtime.GOOS == "windows" {
return "powershell"
}
if shell := strings.TrimSpace(os.Getenv("SHELL")); shell != "" {
parts := strings.FieldsFunc(shell, func(r rune) bool { return r == '/' || r == '\\' })
if len(parts) > 0 {
return parts[len(parts)-1]
}
}
return "sh"
}
func CreateRequestID(prefix string) string {
if prefix == "" {
prefix = "ipc"
}
token := make([]byte, 4)
if _, err := rand.Read(token); err != nil {
return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
}
return fmt.Sprintf("%s-%d-%s", prefix, time.Now().UnixMilli(), hex.EncodeToString(token))
}
func CreateMeta(opts MetaOptions) map[string]any {
meta := map[string]any{
MetaRequestID: valueOr(opts.RequestID, CreateRequestID("ipc")),
MetaShellType: valueOr(opts.ShellType, DefaultShellType()),
MetaEnabledMCP: emptySliceIfNil(opts.EnabledMCP),
}
if strings.TrimSpace(opts.Mode) != "" {
meta[MetaMode] = strings.TrimSpace(opts.Mode)
}
if strings.TrimSpace(opts.Model) != "" {
meta[MetaModel] = strings.TrimSpace(opts.Model)
}
if strings.TrimSpace(opts.CurrentFilePath) != "" {
meta[MetaCurrentFilePath] = strings.TrimSpace(opts.CurrentFilePath)
}
return meta
}
func Connect(ctx context.Context, opts DialOptions) (*Client, error) {
transport, err := connectTransport(ctx, opts)
if err != nil {
return nil, err
}
client := &Client{
transport: transport,
kind: opts.Transport,
pending: make(map[int]chan responseEnvelope),
subs: make(map[int]chan Notification),
closed: make(chan struct{}),
}
go client.readLoop()
return client, nil
}
func (c *Client) Request(ctx context.Context, method string, params any, out any) error {
payload, id, err := c.buildRequest(method, params)
if err != nil {
return err
}
responseCh := make(chan responseEnvelope, 1)
c.pendingMu.Lock()
c.pending[id] = responseCh
c.pendingMu.Unlock()
if err := c.transport.WriteFrame(payload); err != nil {
c.pendingMu.Lock()
delete(c.pending, id)
c.pendingMu.Unlock()
return err
}
select {
case <-ctx.Done():
c.pendingMu.Lock()
delete(c.pending, id)
c.pendingMu.Unlock()
return ctx.Err()
case <-c.closed:
return c.closeError()
case resp := <-responseCh:
if resp.Error != nil {
return fmt.Errorf("Lingma IPC %s failed: %s", method, resp.Error.Message)
}
if out == nil || len(resp.Result) == 0 || string(resp.Result) == "null" {
return nil
}
if err := json.Unmarshal(resp.Result, out); err != nil {
return fmt.Errorf("decode %s result: %w", method, err)
}
return nil
}
}
func (c *Client) Send(method string, params any) error {
payload, _, err := c.buildRequest(method, params)
if err != nil {
return err
}
return c.transport.WriteFrame(payload)
}
func (c *Client) Subscribe() (<-chan Notification, func()) {
id := int(c.nextSubID.Add(1))
ch := make(chan Notification, 2048)
c.subsMu.Lock()
c.subs[id] = ch
c.subsMu.Unlock()
cancel := func() {
c.subsMu.Lock()
if sub, ok := c.subs[id]; ok {
delete(c.subs, id)
close(sub)
}
c.subsMu.Unlock()
}
return ch, cancel
}
func (c *Client) Address() string {
if c.transport == nil {
return ""
}
return c.transport.Address()
}
func (c *Client) Transport() Transport {
return c.kind
}
func (c *Client) Close() error {
c.closeOnce.Do(func() {
close(c.closed)
if err := c.transport.Close(); err != nil {
c.closeErr.Store(err)
}
c.failPending(io.EOF)
c.closeAllSubs()
})
if v := c.closeErr.Load(); v != nil {
return v.(error)
}
return nil
}
func (c *Client) buildRequest(method string, params any) ([]byte, int, error) {
if params == nil {
params = map[string]any{}
}
id := int(c.nextID.Add(1))
payload := map[string]any{
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
}
body, err := json.Marshal(payload)
if err != nil {
return nil, 0, fmt.Errorf("marshal request %s: %w", method, err)
}
return body, id, nil
}
func (c *Client) readLoop() {
defer c.Close()
for {
body, err := c.transport.ReadFrame()
if err != nil {
if !errors.Is(err, io.EOF) {
c.closeErr.Store(err)
}
return
}
var envelope responseEnvelope
if err := json.Unmarshal(body, &envelope); err != nil {
c.closeErr.Store(fmt.Errorf("decode IPC frame: %w", err))
return
}
if envelope.Method != "" {
c.broadcast(Notification{JSONRPC: envelope.JSONRPC, Method: envelope.Method, Params: envelope.Params})
if envelope.ID != nil {
_ = c.sendEmptyResponse(*envelope.ID)
}
continue
}
if envelope.ID == nil {
continue
}
c.pendingMu.Lock()
ch, ok := c.pending[*envelope.ID]
if ok {
delete(c.pending, *envelope.ID)
}
c.pendingMu.Unlock()
if ok {
ch <- envelope
close(ch)
}
}
}
func (c *Client) sendEmptyResponse(id int) error {
c.responseMu.Lock()
defer c.responseMu.Unlock()
body, err := json.Marshal(map[string]any{
"jsonrpc": "2.0",
"id": id,
"result": nil,
})
if err != nil {
return err
}
return c.transport.WriteFrame(body)
}
func (c *Client) broadcast(notification Notification) {
c.subsMu.RLock()
defer c.subsMu.RUnlock()
for _, ch := range c.subs {
ch <- notification
}
}
func (c *Client) failPending(err error) {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
for id, ch := range c.pending {
delete(c.pending, id)
ch <- responseEnvelope{Error: &rpcError{Message: err.Error()}}
close(ch)
}
}
func (c *Client) closeAllSubs() {
c.subsMu.Lock()
defer c.subsMu.Unlock()
for id, ch := range c.subs {
delete(c.subs, id)
close(ch)
}
}
func (c *Client) closeError() error {
if v := c.closeErr.Load(); v != nil {
return v.(error)
}
return io.EOF
}
func valueOr(value string, fallback string) string {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
return fallback
}
func emptySliceIfNil(v []any) []any {
if v == nil {
return []any{}
}
return v
}