from __future__ import annotations from dataclasses import dataclass from typing import Any, Awaitable, Callable from ..lingma_pool import LingmaPool, PoolInstance from ..model_map import build_model_name_map, flatten_model_keys, resolve_model from ..session_cache import SessionCache, hash_branch_context @dataclass class ExecutionContext: ask_mode: str lookup_key: str | None write_key: str | None cached_session_id: str | None inst: PoolInstance model: str prompt: str is_reply: bool affinity: str | None def _resolve_ask_mode(model: str, has_tooling_context: bool, *, default_ask_mode: str) -> str: model_name = (model or "").lower() if model_name in {"lingma-agent", "agent"} or has_tooling_context: return "agent" return default_ask_mode async def _apply_cached_instance_or_invalidate( *, protocol: str, logger: Any, session_cache: SessionCache, inst: PoolInstance, cached_instance_name: str | None, cached_session_id: str | None, lookup_key: str | None, ) -> str | None: if cached_instance_name and inst.name != cached_instance_name: logger.info( "%s session cache instance %s unhealthy, falling back to %s", protocol, cached_instance_name, inst.name, ) if lookup_key: await session_cache.invalidate(lookup_key) return None return cached_session_id async def prepare_execution_context( *, protocol: str, requested_model: str, has_tooling_context: bool, tool_config: dict[str, Any] | None, messages_dump: list[dict[str, Any]], api_key: str, affinity_key: str | None, pool: LingmaPool, session_cache: SessionCache, logger: Any, default_model: str, default_ask_mode: str, ensure_instance_logged_in: Callable[[PoolInstance], Awaitable[Any]], last_user_text: Callable[[list[dict[str, Any]]], str], messages_to_prompt: Callable[[list[dict[str, Any]]], str], ) -> ExecutionContext: ask_mode = _resolve_ask_mode( requested_model, has_tooling_context, default_ask_mode=default_ask_mode, ) reuse_eligible = ( session_cache.enabled and ask_mode == "chat" and len(messages_dump) >= 2 and not has_tooling_context ) lookup_key: str | None = None write_key: str | None = None cached_session_id: str | None = None cached_instance_name: str | None = None if reuse_eligible: prefix_branch_context = hash_branch_context(messages_dump[:-1]) lookup_key = session_cache.build_key( api_key, messages_dump[:-1], tool_config=tool_config, branch_context=prefix_branch_context, ) write_key = session_cache.build_key( api_key, messages_dump, tool_config=tool_config, branch_context=hash_branch_context(messages_dump), ) entry = await session_cache.get(lookup_key) if entry is None: legacy_lookup_key = session_cache.build_key(api_key, messages_dump[:-1], tool_config=tool_config) entry = await session_cache.get(legacy_lookup_key) if entry is not None: lookup_key = legacy_lookup_key if entry is not None: cached_session_id = entry.session_id cached_instance_name = entry.instance_name or None affinity = cached_instance_name or affinity_key inst = pool.pick(affinity_key=affinity) cached_session_id = await _apply_cached_instance_or_invalidate( protocol=protocol, logger=logger, session_cache=session_cache, inst=inst, cached_instance_name=cached_instance_name, cached_session_id=cached_session_id, lookup_key=lookup_key, ) await ensure_instance_logged_in(inst) models = await inst.client.query_models() available = flatten_model_keys(models) name_map = build_model_name_map(models) model = resolve_model(requested_model, available, default_model, name_map) if cached_session_id: prompt = last_user_text(messages_dump) is_reply = True else: prompt = messages_to_prompt(messages_dump) is_reply = False return ExecutionContext( ask_mode=ask_mode, lookup_key=lookup_key, write_key=write_key, cached_session_id=cached_session_id, inst=inst, model=model, prompt=prompt, is_reply=is_reply, affinity=affinity, )