fix: reconnect stale Lingma IPC sessions
This commit is contained in:
@@ -229,7 +229,7 @@ func (s *Service) ListModels(ctx context.Context) ([]Model, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Generate(ctx context.Context, req ChatRequest) (*ChatResult, error) {
|
func (s *Service) Generate(ctx context.Context, req ChatRequest) (*ChatResult, error) {
|
||||||
return s.generateLocked(ctx, req, nil)
|
return s.generateWithReconnect(ctx, req, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GenerateStream(ctx context.Context, req ChatRequest) (<-chan StreamEvent, <-chan StreamResult, error) {
|
func (s *Service) GenerateStream(ctx context.Context, req ChatRequest) (<-chan StreamEvent, <-chan StreamResult, error) {
|
||||||
@@ -237,7 +237,7 @@ func (s *Service) GenerateStream(ctx context.Context, req ChatRequest) (<-chan S
|
|||||||
done := make(chan StreamResult, 1)
|
done := make(chan StreamResult, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
result, err := s.generateLocked(ctx, req, func(delta string) {
|
result, err := s.generateWithReconnect(ctx, req, func(delta string) {
|
||||||
if delta == "" {
|
if delta == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -255,6 +255,20 @@ func (s *Service) GenerateStream(ctx context.Context, req ChatRequest) (<-chan S
|
|||||||
return events, done, nil
|
return events, done, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) generateWithReconnect(
|
||||||
|
ctx context.Context,
|
||||||
|
req ChatRequest,
|
||||||
|
onDelta func(string),
|
||||||
|
) (*ChatResult, error) {
|
||||||
|
result, err := s.generateLocked(ctx, req, onDelta)
|
||||||
|
if err == nil || !isRecoverableIPCError(err) {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.resetConnection()
|
||||||
|
return s.generateLocked(ctx, req, onDelta)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) generateLocked(
|
func (s *Service) generateLocked(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req ChatRequest,
|
req ChatRequest,
|
||||||
@@ -388,6 +402,29 @@ func shouldRetryTooling(choice toolemulation.ToolChoice, text string) bool {
|
|||||||
return toolemulation.LooksLikeRefusal(text) || toolemulation.LooksLikeMissedToolUse(text)
|
return toolemulation.LooksLikeRefusal(text) || toolemulation.LooksLikeMissedToolUse(text)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isRecoverableIPCError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
text := strings.ToLower(err.Error())
|
||||||
|
needles := []string{
|
||||||
|
"use of closed network connection",
|
||||||
|
"broken pipe",
|
||||||
|
"connection reset by peer",
|
||||||
|
"connection refused",
|
||||||
|
"websocket: close",
|
||||||
|
"unexpected eof",
|
||||||
|
"io: read/write on closed pipe",
|
||||||
|
"lingma ipc notification stream closed",
|
||||||
|
}
|
||||||
|
for _, needle := range needles {
|
||||||
|
if strings.Contains(text, needle) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) buildChatResult(
|
func (s *Service) buildChatResult(
|
||||||
req ChatRequest,
|
req ChatRequest,
|
||||||
sessionID string,
|
sessionID string,
|
||||||
@@ -467,6 +504,12 @@ func (s *Service) closeClientLocked() error {
|
|||||||
return client.Close()
|
return client.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) resetConnection() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
_ = s.closeClientLocked()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) resolveSession(ctx context.Context, client *lingmaipc.Client, mode SessionMode) (string, error) {
|
func (s *Service) resolveSession(ctx context.Context, client *lingmaipc.Client, mode SessionMode) (string, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|||||||
25
internal/service/service_test.go
Normal file
25
internal/service/service_test.go
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestIsRecoverableIPCError(t *testing.T) {
|
||||||
|
cases := []error{
|
||||||
|
errors.New("write websocket frame: write tcp 127.0.0.1:64954->127.0.0.1:36510: use of closed network connection"),
|
||||||
|
errors.New("broken pipe"),
|
||||||
|
errors.New("Lingma IPC notification stream closed"),
|
||||||
|
}
|
||||||
|
for _, err := range cases {
|
||||||
|
if !isRecoverableIPCError(err) {
|
||||||
|
t.Fatalf("expected recoverable error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIsRecoverableIPCErrorIgnoresModelErrors(t *testing.T) {
|
||||||
|
if isRecoverableIPCError(errors.New("timed out while waiting for Lingma IPC to finish responding")) {
|
||||||
|
t.Fatal("timeout should not be treated as an immediate reconnect retry")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user