perf: reduce IPC setup overhead

This commit is contained in:
coolxll
2026-03-25 23:17:06 +08:00
parent dc04e425c6
commit e5d1134502
2 changed files with 82 additions and 25 deletions

View File

@@ -36,6 +36,14 @@ func main() {
addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
svc := service.New(cfg) svc := service.New(cfg)
warmupCtx, warmupCancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := svc.Warmup(warmupCtx); err != nil {
log.Printf("warmup failed: %v", err)
} else {
log.Printf("Lingma IPC warmup completed")
}
warmupCancel()
server := httpapi.NewServer(addr, svc) server := httpapi.NewServer(addr, svc)
log.Printf("lingma-ipc-proxy listening on http://%s", addr) log.Printf("lingma-ipc-proxy listening on http://%s", addr)

View File

@@ -88,6 +88,7 @@ type Service struct {
client *lingmaipc.Client client *lingmaipc.Client
pipePath string pipePath string
stickySessionID string stickySessionID string
stickyModelID string
} }
type promptRunResult struct { type promptRunResult struct {
@@ -119,6 +120,11 @@ func New(cfg Config) *Service {
return &Service{cfg: cfg} return &Service{cfg: cfg}
} }
func (s *Service) Warmup(ctx context.Context) error {
_, err := s.ensureConnected(ctx)
return err
}
func (s *Service) Close() error { func (s *Service) Close() error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@@ -137,10 +143,7 @@ func (s *Service) State() State {
} }
func (s *Service) ListModels(ctx context.Context) ([]Model, error) { func (s *Service) ListModels(ctx context.Context) ([]Model, error) {
s.mu.Lock() ipcClient, err := s.ensureConnected(ctx)
defer s.mu.Unlock()
ipcClient, err := s.ensureConnectedLocked(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -158,8 +161,6 @@ func (s *Service) ListModels(ctx context.Context) ([]Model, error) {
} }
func (s *Service) Generate(ctx context.Context, req ChatRequest) (*ChatResult, error) { func (s *Service) Generate(ctx context.Context, req ChatRequest) (*ChatResult, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.generateLocked(ctx, req, nil) return s.generateLocked(ctx, req, nil)
} }
@@ -168,7 +169,6 @@ func (s *Service) GenerateStream(ctx context.Context, req ChatRequest) (<-chan S
done := make(chan StreamResult, 1) done := make(chan StreamResult, 1)
go func() { go func() {
s.mu.Lock()
result, err := s.generateLocked(ctx, req, func(delta string) { result, err := s.generateLocked(ctx, req, func(delta string) {
if delta == "" { if delta == "" {
return return
@@ -178,7 +178,6 @@ func (s *Service) GenerateStream(ctx context.Context, req ChatRequest) (<-chan S
case <-ctx.Done(): case <-ctx.Done():
} }
}) })
s.mu.Unlock()
close(events) close(events)
done <- StreamResult{Result: result, Err: err} done <- StreamResult{Result: result, Err: err}
@@ -196,7 +195,7 @@ func (s *Service) generateLocked(
requestCtx, cancel := context.WithTimeout(ctx, s.cfg.Timeout) requestCtx, cancel := context.WithTimeout(ctx, s.cfg.Timeout)
defer cancel() defer cancel()
ipcClient, err := s.ensureConnectedLocked(requestCtx) ipcClient, err := s.ensureConnected(requestCtx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -210,7 +209,7 @@ func (s *Service) generateLocked(
return nil, errors.New("empty user message") return nil, errors.New("empty user message")
} }
sessionID, err := s.resolveSessionLocked(requestCtx, ipcClient, effectiveMode) sessionID, err := s.resolveSession(requestCtx, ipcClient, effectiveMode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -233,30 +232,32 @@ func (s *Service) generateLocked(
EnabledMCP: []any{}, EnabledMCP: []any{},
}) })
if strings.TrimSpace(req.Model) != "" { modelID := strings.TrimSpace(req.Model)
if modelID != "" && s.shouldSetModel(sessionID, effectiveMode, modelID) {
if err := ipcClient.Request(requestCtx, "session/set_model", map[string]any{ if err := ipcClient.Request(requestCtx, "session/set_model", map[string]any{
"sessionId": sessionID, "sessionId": sessionID,
"modelId": strings.TrimSpace(req.Model), "modelId": modelID,
"timestamp": time.Now().UnixMilli(), "timestamp": time.Now().UnixMilli(),
"_meta": meta, "_meta": meta,
}, nil); err != nil { }, nil); err != nil {
if effectiveMode == SessionModeReuse { if effectiveMode == SessionModeReuse {
s.stickySessionID = "" s.invalidateStickySession()
} }
return nil, err return nil, err
} }
s.rememberStickyModel(sessionID, modelID)
} }
runResult, err := s.runPromptLocked(requestCtx, ipcClient, sessionID, prompt, requestID, meta, onDelta) runResult, err := s.runPromptLocked(requestCtx, ipcClient, sessionID, prompt, requestID, meta, onDelta)
if err != nil { if err != nil {
if effectiveMode == SessionModeReuse { if effectiveMode == SessionModeReuse {
s.stickySessionID = "" s.invalidateStickySession()
} }
return nil, err return nil, err
} }
if runResult.TimedOut || strings.TrimSpace(runResult.AssistantText) == "" { if runResult.TimedOut || strings.TrimSpace(runResult.AssistantText) == "" {
if effectiveMode == SessionModeReuse { if effectiveMode == SessionModeReuse {
s.stickySessionID = "" s.invalidateStickySession()
} }
} }
if runResult.TimedOut && strings.TrimSpace(runResult.AssistantText) == "" { if runResult.TimedOut && strings.TrimSpace(runResult.AssistantText) == "" {
@@ -292,21 +293,21 @@ func (s *Service) buildChatResult(
StopReason: nestedString(runResult.PromptResult, "stopReason"), StopReason: nestedString(runResult.PromptResult, "stopReason"),
UsedTokens: int(nestedInt64(runResult.ContextUsage, "usedTokens")), UsedTokens: int(nestedInt64(runResult.ContextUsage, "usedTokens")),
LimitTokens: int(nestedInt64(runResult.ContextUsage, "limitTokens")), LimitTokens: int(nestedInt64(runResult.ContextUsage, "limitTokens")),
PipePath: s.pipePath, PipePath: s.currentPipePath(),
EffectiveSession: effectiveMode, EffectiveSession: effectiveMode,
} }
} }
func (s *Service) ensureConnected(ctx context.Context) (*lingmaipc.Client, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.ensureConnectedLocked(ctx)
}
func (s *Service) ensureConnectedLocked(ctx context.Context) (*lingmaipc.Client, error) { func (s *Service) ensureConnectedLocked(ctx context.Context) (*lingmaipc.Client, error) {
if s.client != nil { if s.client != nil {
pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var pong any
if err := s.client.Request(pingCtx, "ping", map[string]any{}, &pong); err == nil {
return s.client, nil return s.client, nil
} }
_ = s.closeClientLocked()
}
pipePath, err := lingmaipc.ResolvePipePath(s.cfg.Pipe) pipePath, err := lingmaipc.ResolvePipePath(s.cfg.Pipe)
if err != nil { if err != nil {
@@ -333,16 +334,63 @@ func (s *Service) ensureConnectedLocked(ctx context.Context) (*lingmaipc.Client,
func (s *Service) closeClientLocked() error { func (s *Service) closeClientLocked() error {
if s.client == nil { if s.client == nil {
s.pipePath = "" s.pipePath = ""
s.stickySessionID = "" s.clearStickyLocked()
return nil return nil
} }
client := s.client client := s.client
s.client = nil s.client = nil
s.pipePath = "" s.pipePath = ""
s.stickySessionID = "" s.clearStickyLocked()
return client.Close() return client.Close()
} }
func (s *Service) resolveSession(ctx context.Context, client *lingmaipc.Client, mode SessionMode) (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.resolveSessionLocked(ctx, client, mode)
}
func (s *Service) invalidateStickySession() {
s.mu.Lock()
defer s.mu.Unlock()
s.clearStickyLocked()
}
func (s *Service) rememberStickyModel(sessionID string, modelID string) {
s.mu.Lock()
defer s.mu.Unlock()
if strings.TrimSpace(s.stickySessionID) == strings.TrimSpace(sessionID) {
s.stickyModelID = strings.TrimSpace(modelID)
}
}
func (s *Service) shouldSetModel(sessionID string, mode SessionMode, modelID string) bool {
if strings.TrimSpace(modelID) == "" {
return false
}
if mode != SessionModeReuse {
return true
}
s.mu.Lock()
defer s.mu.Unlock()
if strings.TrimSpace(s.stickySessionID) != strings.TrimSpace(sessionID) {
return true
}
return strings.TrimSpace(s.stickyModelID) != strings.TrimSpace(modelID)
}
func (s *Service) clearStickyLocked() {
s.stickySessionID = ""
s.stickyModelID = ""
}
func (s *Service) currentPipePath() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.pipePath
}
func (s *Service) resolveSessionLocked(ctx context.Context, client *lingmaipc.Client, mode SessionMode) (string, error) { func (s *Service) resolveSessionLocked(ctx context.Context, client *lingmaipc.Client, mode SessionMode) (string, error) {
if mode == SessionModeReuse && strings.TrimSpace(s.stickySessionID) != "" { if mode == SessionModeReuse && strings.TrimSpace(s.stickySessionID) != "" {
return s.stickySessionID, nil return s.stickySessionID, nil
@@ -371,6 +419,7 @@ func (s *Service) resolveSessionLocked(ctx context.Context, client *lingmaipc.Cl
if mode == SessionModeReuse { if mode == SessionModeReuse {
s.stickySessionID = sessionID s.stickySessionID = sessionID
s.stickyModelID = ""
} }
return sessionID, nil return sessionID, nil
} }