diff --git a/internal/service/service.go b/internal/service/service.go index f018115..f2da1ee 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -229,7 +229,7 @@ func (s *Service) ListModels(ctx context.Context) ([]Model, error) { } func (s *Service) Generate(ctx context.Context, req ChatRequest) (*ChatResult, error) { - return s.generateLocked(ctx, req, nil) + return s.generateWithReconnect(ctx, req, nil) } func (s *Service) GenerateStream(ctx context.Context, req ChatRequest) (<-chan StreamEvent, <-chan StreamResult, error) { @@ -237,7 +237,7 @@ func (s *Service) GenerateStream(ctx context.Context, req ChatRequest) (<-chan S done := make(chan StreamResult, 1) go func() { - result, err := s.generateLocked(ctx, req, func(delta string) { + result, err := s.generateWithReconnect(ctx, req, func(delta string) { if delta == "" { return } @@ -255,6 +255,20 @@ func (s *Service) GenerateStream(ctx context.Context, req ChatRequest) (<-chan S return events, done, nil } +func (s *Service) generateWithReconnect( + ctx context.Context, + req ChatRequest, + onDelta func(string), +) (*ChatResult, error) { + result, err := s.generateLocked(ctx, req, onDelta) + if err == nil || !isRecoverableIPCError(err) { + return result, err + } + + s.resetConnection() + return s.generateLocked(ctx, req, onDelta) +} + func (s *Service) generateLocked( ctx context.Context, req ChatRequest, @@ -388,6 +402,29 @@ func shouldRetryTooling(choice toolemulation.ToolChoice, text string) bool { return toolemulation.LooksLikeRefusal(text) || toolemulation.LooksLikeMissedToolUse(text) } +func isRecoverableIPCError(err error) bool { + if err == nil { + return false + } + text := strings.ToLower(err.Error()) + needles := []string{ + "use of closed network connection", + "broken pipe", + "connection reset by peer", + "connection refused", + "websocket: close", + "unexpected eof", + "io: read/write on closed pipe", + "lingma ipc notification stream closed", + } + for _, needle := range needles { + if strings.Contains(text, needle) { + return true + } + } + return false +} + func (s *Service) buildChatResult( req ChatRequest, sessionID string, @@ -467,6 +504,12 @@ func (s *Service) closeClientLocked() error { return client.Close() } +func (s *Service) resetConnection() { + s.mu.Lock() + defer s.mu.Unlock() + _ = s.closeClientLocked() +} + func (s *Service) resolveSession(ctx context.Context, client *lingmaipc.Client, mode SessionMode) (string, error) { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/service/service_test.go b/internal/service/service_test.go new file mode 100644 index 0000000..3c0b022 --- /dev/null +++ b/internal/service/service_test.go @@ -0,0 +1,25 @@ +package service + +import ( + "errors" + "testing" +) + +func TestIsRecoverableIPCError(t *testing.T) { + cases := []error{ + errors.New("write websocket frame: write tcp 127.0.0.1:64954->127.0.0.1:36510: use of closed network connection"), + errors.New("broken pipe"), + errors.New("Lingma IPC notification stream closed"), + } + for _, err := range cases { + if !isRecoverableIPCError(err) { + t.Fatalf("expected recoverable error: %v", err) + } + } +} + +func TestIsRecoverableIPCErrorIgnoresModelErrors(t *testing.T) { + if isRecoverableIPCError(errors.New("timed out while waiting for Lingma IPC to finish responding")) { + t.Fatal("timeout should not be treated as an immediate reconnect retry") + } +}