"""Session loop: alternate LLM completions with sandbox tool execution.
The single public entry point :func:`run_session_loop` drives a multi-turn
assistant pass. Two optional parameters extend it:
* ``on_event`` — receives one :class:`~rath.llm.RathLLMStreamDelta` per
streamed chunk. Requires the resolved chat client to satisfy
:class:`~rath.llm.StreamingChatClient`. When
``on_event`` is ``None`` (the default), the loop runs non-streaming.
* ``persist`` / ``persist_path`` — when truthy, the loop holds a
:class:`~rath.session.persistence.SessionWriter` internally and appends each
new chunk to ``.openrath/sessions/<out.id>.jsonl`` (or to ``persist_path``).
The writer's trailer is written on graceful return; on exception the file
is left without a trailer (``closed=False`` on reload — the crash signal).
"""
from __future__ import annotations
import json
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, Mapping, Protocol, runtime_checkable
from pydantic import BaseModel
from rath.backend import (
CodeResult,
CommandResult,
FileContent,
FileEntries,
FileWriteResult,
ToolExecutionFailure,
ToolResult,
)
from rath.flow.tool import (
FlowToolCall,
)
from rath.llm import (
Provider,
RathLLMAssistantMessage,
RathLLMChatChoice,
RathLLMChatRequest,
RathLLMChatResponse,
RathLLMFinishReason,
RathLLMFunctionTool,
RathLLMStreamDelta,
RathLLMTokenUsage,
RathLLMToolCallFunction,
RathLLMToolCallPart,
StreamingChatClient,
add_usage,
chat_client_for,
)
from rath.llm.tool_args import parse_tool_arguments
from rath.session.chunk import (
ChunkRow,
ChunkTable,
)
from rath.session.graph import LineageKind, LineageRecorder, SessionLineage
from rath.session.manager import session_registry
from rath.session.persistence import SessionWriter
from rath.session.provider_builtin import DefaultSessionLoopExecutor
from rath.session.session import Session
from rath.utils.decoding import decode_subprocess_output
logger = logging.getLogger(__name__)
OnEventCb = Callable[[RathLLMStreamDelta], None]
"""Type alias for the streaming-delta callback consumed by :func:`run_session_loop`."""
[docs]
@runtime_checkable
class SessionLoopExecutor(Protocol):
"""Runs completions and tool dispatch used by ``run_session_loop``."""
[docs]
def complete(self, req: RathLLMChatRequest) -> RathLLMChatResponse:
"""Run one chat completion."""
def _accumulate_stream_to_response(
deltas: Any,
*,
on_event: Callable[[RathLLMStreamDelta], None],
model: str = "",
) -> RathLLMChatResponse:
"""Fold an iterable of stream deltas into one :class:`RathLLMChatResponse`.
Each delta is forwarded to ``on_event`` immediately so callers can drive a
streaming UI; the final accumulated message is then returned for the
session loop to append to its chunk_table as a single atomic chunk.
"""
text_parts: list[str] = []
tool_buckets: dict[int, dict[str, Any]] = {}
finish: RathLLMFinishReason | None = None
usage: RathLLMTokenUsage | None = None
for d in deltas:
on_event(d)
if d.content_delta:
text_parts.append(d.content_delta)
if d.tool_call_index is not None or d.tool_call_id is not None:
idx = d.tool_call_index if d.tool_call_index is not None else 0
bucket = tool_buckets.setdefault(
idx, {"id": "", "name": "", "arguments": ""}
)
if d.tool_call_id:
bucket["id"] = d.tool_call_id
if d.tool_call_name_delta:
bucket["name"] = (bucket["name"] or "") + d.tool_call_name_delta
if d.tool_call_args_delta:
bucket["arguments"] = (
bucket["arguments"] or ""
) + d.tool_call_args_delta
if d.finish_reason is not None:
finish = d.finish_reason
if d.usage is not None:
usage = d.usage
tool_calls: tuple[RathLLMToolCallPart, ...] | None = None
if tool_buckets:
parts: list[RathLLMToolCallPart] = []
for _, bucket in sorted(tool_buckets.items()):
arg_str = bucket.get("arguments") or ""
parsed, parse_error = parse_tool_arguments(arg_str)
parts.append(
RathLLMToolCallPart(
id=str(bucket.get("id") or ""),
type="function",
function=RathLLMToolCallFunction(
name=str(bucket.get("name") or ""),
arguments=arg_str,
arguments_parsed=parsed,
arguments_parse_error=parse_error,
),
)
)
tool_calls = tuple(parts)
content_text = "".join(text_parts) if text_parts else None
if finish is None:
finish = "tool_calls" if tool_calls else "stop"
return RathLLMChatResponse(
id="",
choices=(
RathLLMChatChoice(
index=0,
finish_reason=finish,
message=RathLLMAssistantMessage(
role="assistant",
content=content_text,
tool_calls=tool_calls,
),
),
),
created=0,
model=model,
usage=usage,
)
[docs]
class StreamingExecutor:
"""Adapt a :class:`StreamingChatClient` to the :class:`SessionLoopExecutor` protocol.
``complete()`` consumes the client's ``complete_stream(req)``, forwards
each delta to ``on_event``, and returns the accumulated response. Tool
dispatch and schema lookup are delegated to an inner executor (a fresh
:class:`DefaultSessionLoopExecutor` wrapping the same client when one is
not supplied).
"""
__slots__ = ("client", "_on_event", "_inner")
def __init__(
self,
client: StreamingChatClient,
on_event: Callable[[RathLLMStreamDelta], None],
inner: SessionLoopExecutor | None = None,
) -> None:
self.client = client
self._on_event = on_event
self._inner = inner or DefaultSessionLoopExecutor(client)
def complete(self, req: RathLLMChatRequest) -> RathLLMChatResponse:
return _accumulate_stream_to_response(
self.client.complete_stream(req),
on_event=self._on_event,
model=getattr(self.client.provider, "model", "") or "",
)
def dispatch_tool(
self, session: Session, tool: FlowToolCall, arguments: Mapping[str, Any]
) -> Any:
return self._inner.dispatch_tool(session, tool, arguments)
def tool_schemas(self) -> tuple[RathLLMFunctionTool, ...]:
return self._inner.tool_schemas()
def resolve_executor(
*,
agent_provider: Provider,
executor: SessionLoopExecutor | None,
on_event: OnEventCb | None,
) -> SessionLoopExecutor:
"""Pick the executor for ``run_session_loop`` / ``run_session_compress``.
A caller-supplied ``executor`` is returned as-is (and is incompatible with
``on_event``). Otherwise a chat client is built from ``agent_provider`` —
streaming when ``on_event`` is set, default otherwise.
"""
if executor is not None and on_event is not None:
raise ValueError(
"on_event with a custom executor is not supported; "
"wrap your client with StreamingExecutor and pass that as executor=."
)
if executor is not None:
return executor
client = chat_client_for(agent_provider)
if on_event is not None:
if not isinstance(client, StreamingChatClient):
raise TypeError(
"on_event requires a StreamingChatClient; "
f"{type(client).__name__} (provider_kind="
f"{agent_provider.provider_kind!r}) does not implement "
"complete_stream(req). Drop on_event for non-streaming."
)
return StreamingExecutor(client, on_event)
return DefaultSessionLoopExecutor(client)
def _sync_loop_out_rows(out: Session, rows_list: list[ChunkRow]) -> None:
# Bypass the lazy property setter so this is callable from runtime
# coroutines without poking the synchronize() lock.
out._chunk_table = ChunkTable(rows=tuple(rows_list))
def _accumulate_usage_and_check_budget(
out: Session,
resp: RathLLMChatResponse,
provider: Provider,
) -> None:
"""Fold ``resp.usage`` into ``out.cumulative_usage`` and trip the budget guard.
The guard fires **only on the completion that first pushes the running
total past ``provider.budget_total_tokens``**, never again for the same
``out`` session. That keeps a multi-round tool-calling loop from
re-invoking the callback (or re-logging the warning) every round once the
cap has already been crossed; callers that want to abort the loop are
expected to raise :class:`~rath.llm.BudgetExceededError` from the
callback on that first call.
The latch is implicit in the prev/new transition (``prev <= cap`` and
``new > cap``); no new session state is introduced.
"""
if resp.usage is None:
return
# Access the private slot directly so this helper is safe to call from
# the runtime coroutine, where reading ``out.cumulative_usage`` (the
# lazy property) would deadlock on the still-pending future.
current = out._cumulative_usage
prev_total = current.total_tokens if current is not None else 0
out._cumulative_usage = add_usage(current, resp.usage)
cap = provider.budget_total_tokens
if cap is None or out._cumulative_usage is None:
return
new_total = out._cumulative_usage.total_tokens
if new_total <= cap or prev_total > cap:
return
callback = provider.on_budget_exceeded
if callback is None:
logger.warning(
"session %s exceeded budget_total_tokens=%d "
"(cumulative=%d); no callback configured",
out.id,
cap,
new_total,
)
return
callback(out, out._cumulative_usage)
def _loop_tool_error_payload(
kind: str, message: str, *, detail: str | None = None
) -> str:
"""JSON string for a tool failure returned as the next ``role=tool`` body."""
payload: dict[str, Any] = {
"ok": False,
"error_kind": kind,
"message": message,
}
if detail:
payload["detail"] = detail[:4000]
return json.dumps(payload)
def _summarize_inline_result(obj: Any) -> str:
"""JSON text for arbitrary tool return values."""
try:
if isinstance(obj, BaseModel):
payload: Any = obj.model_dump(mode="json")
else:
payload = obj
text = json.dumps(payload, ensure_ascii=False, default=str)
if len(text) > 48_000:
text = text[:48_000] + "...(truncated)"
return text
except TypeError:
return json.dumps({"repr": repr(obj), "type": type(obj).__name__})
def _summarize_tool_result(_call: FlowToolCall, raw: ToolResult | bool) -> str:
"""JSON text for the next ``role=tool`` message."""
if isinstance(raw, ToolExecutionFailure):
return json.dumps(
{
"ok": False,
"error_kind": raw.kind,
"message": raw.message,
**({"detail": raw.detail} if raw.detail else {}),
}
)
if isinstance(raw, bool):
return json.dumps({"ok": raw})
if isinstance(raw, CommandResult):
return json.dumps(
{
"exit_code": raw.exit_code,
"stdout": decode_subprocess_output(raw.stdout),
"stderr": decode_subprocess_output(raw.stderr),
"elapsed_ms": raw.elapsed_ms,
}
)
if isinstance(raw, FileContent):
data = raw.data
if isinstance(data, bytes):
data = decode_subprocess_output(data)
text = str(data)
if len(text) > 12_000:
text = text[:12_000] + "...(truncated)"
return json.dumps({"data": text})
if isinstance(raw, FileEntries):
payload = [
{"name": e.name, "path": e.path, "is_dir": e.is_dir}
for e in raw.entries[:500]
]
return json.dumps({"entries": payload})
if isinstance(raw, FileWriteResult):
return json.dumps({"bytes_written": raw.bytes_written})
if isinstance(raw, CodeResult):
stdout = decode_subprocess_output(raw.stdout)
stderr = decode_subprocess_output(raw.stderr)
return json.dumps(
{
"text": raw.text,
"stdout": stdout,
"stderr": stderr,
"error": raw.error,
}
)
return json.dumps({"type": type(raw).__name__, "note": "unserialised result"})
def _summarize_dispatch_result(tool: FlowToolCall, raw: Any) -> str:
if isinstance(raw, ToolResult) or isinstance(raw, bool):
return _summarize_tool_result(tool, raw)
return _summarize_inline_result(raw)
def _append_and_persist(
out: Session,
rows_list: list[ChunkRow],
row: ChunkRow,
writer: SessionWriter | None,
) -> None:
"""Append ``row`` to ``rows_list`` (and JSONL if a writer is set).
Does **not** rebuild ``out.chunk_table`` per row — the caller materialises
once per assistant turn via :func:`_sync_loop_out_rows`, plus one final
rebuild after the loop. This keeps the loop O(rounds) rather than the
O(rows²) shape that comes from rebuilding the tuple on every append.
"""
rows_list.append(row)
if writer is not None:
writer.write_chunk(len(rows_list) - 1, row)
[docs]
def run_session_loop(
user_session: Session,
agent_session: Session,
*,
agent_provider: Provider,
tools: list[FlowToolCall] | None = None,
executor: SessionLoopExecutor | None = None,
max_tool_rounds: int = 64,
on_event: Callable[[RathLLMStreamDelta], None] | None = None,
persist: bool = False,
persist_path: Path | None = None,
sandbox_handle_id: str | None = None,
lazy: bool = True,
) -> Session:
"""Run one multi-turn assistant pass with optional tool rounds.
Built-in tools come from :func:`~rath.flow.tool.global_system_tools`; pass
instantiated :class:`~rath.flow.tool.FlowToolCall` objects in ``tools`` to
add or override. Shadowing built-in names is disallowed.
Shares the ``BackendSandbox`` from ``user_session`` with the returned
session (refcount + 1); the user session keeps its reference and either
side can :meth:`Session.close_sandbox` independently. LLM routing kwargs
come from ``agent_provider``; completions and tool dispatch go through
``executor`` (a fresh :class:`DefaultSessionLoopExecutor` is built when
omitted).
When ``on_event`` is provided, completions stream — the resolved client
must satisfy :class:`~rath.llm.StreamingChatClient`, otherwise a
:class:`TypeError` is raised before any session is registered. Each
:class:`RathLLMStreamDelta` is forwarded to ``on_event``; chunks are still
appended atomically (one accumulated assistant message per round).
When ``persist`` is true or ``persist_path`` is given, every appended row
is written to ``.openrath/sessions/<out.id>.jsonl`` (or to
``persist_path``). On graceful return the trailer is written; on
exception the file is abandoned without a trailer.
Message assembly concatenates ``agent_session.chunk_table`` ahead of the
user rows for the LLM; head rows stay out of ``out.chunk_table``.
When ``lazy=True`` (the default), the returned :class:`Session` is a lazy
handle: lineage attributes, ``id``, and ``sandbox`` are set immediately,
but the transcript materialises only when the caller reads
``out.chunk_table`` (or calls ``out.synchronize()``). The runtime
executes the loop on a background asyncio loop so multiple
``run_session_loop`` calls can overlap.
"""
# Join lazy input sessions before submitting the loop coroutine.
if user_session._pending is not None:
user_session.synchronize()
if agent_session._pending is not None:
agent_session.synchronize()
# The async loop materialises in-process. The lazy=False path lets
# callers opt out of the lazy facade for tests/debugging — same return
# shape, just block synchronously in the runtime.
from rath._async.aloop import _arun_session_loop
from rath._async.lazy import LazyValue
from rath._async.runtime import runtime
# Pre-build ``out`` so callers can read .id / .sandbox / lineage eagerly.
rows_seed: list[ChunkRow] = list(user_session._chunk_table.rows)
out = Session(
chunk_table=ChunkTable(rows=tuple(rows_seed)),
sandbox_backend=user_session.sandbox_backend,
_sandbox_open_spec=user_session._sandbox_open_spec,
lineage=SessionLineage(
producer_user_session_id=user_session.id,
producer_system_session_id=agent_session.id,
),
)
if user_session.sandbox is not None and not user_session.sandbox.closed:
out.bind_sandbox(user_session.sandbox)
LineageRecorder.stamp_new_session(
out,
parent_session_ids=(user_session.id, agent_session.id),
lineage_operator="run_session_loop",
lineage_kind=LineageKind.OP_SESSION_LOOP,
)
reg = session_registry()
reg.register(user_session)
reg.register(agent_session)
reg.register(out)
reg.set_active(out)
async def _coro() -> tuple[ChunkTable, RathLLMTokenUsage | None]:
# ``_arun_session_loop`` wraps sync executors via ``_SyncExecutorAsyncAdapter``;
# the cast is the type-checker hand-off for that runtime adapter.
from typing import cast
materialized = await _arun_session_loop(
user_session,
agent_session,
agent_provider=agent_provider,
tools=tools,
executor=cast(Any, executor),
max_tool_rounds=max_tool_rounds,
on_event=on_event,
persist=persist,
persist_path=persist_path,
sandbox_handle_id=sandbox_handle_id,
out=out,
)
return materialized._chunk_table, materialized._cumulative_usage
fut = runtime().submit(_coro())
lazy_val = LazyValue(fut)
out._set_pending(lazy_val)
if not lazy:
out.synchronize()
return out
__all__ = [
"OnEventCb",
"SessionLoopExecutor",
"StreamingExecutor",
"run_session_loop",
]