"""Session — chunk transcript with optional sandbox binding and lazy materialization.
Public surface is intentionally synchronous: callers construct a
:class:`Session`, run a workflow, and read ``out.chunk_table``. Under the
hood, :func:`~rath.session.loop.run_session_loop` may return an ``out``
whose ``chunk_table`` / ``cumulative_usage`` are still being materialized
on :class:`~rath._async.runtime.OpenRathRuntime`. Reading either attribute
implicitly calls :meth:`Session.synchronize`, which blocks until the
runtime publishes the result. Lineage attributes are eager and do not
trigger synchronize().
"""
from __future__ import annotations
import logging
import threading
import weakref
from types import TracebackType
from typing import TYPE_CHECKING, Any
from uuid import UUID, uuid4
from rath.backend import BackendSandbox, BackendSandboxSpec, get
from rath.llm import add_usage
from rath.llm.chat_response import RathLLMTokenUsage
from rath.session.chunk import ChunkKind, ChunkRow, ChunkTable
from rath.session.graph.kind import LineageKind
from rath.session.graph.legacy import SessionLineage
from rath.session.graph.recording import LineageRecorder
if TYPE_CHECKING:
from rath._async.lazy import LazyValue
logger = logging.getLogger(__name__)
# Thread-local re-entrancy guard for tools running on the runtime's thread
# pool. ``_arun_session_loop`` dispatches each tool through
# ``asyncio.to_thread``; that worker thread sets this flag before calling
# the tool body so any ``session.chunk_table`` read inside the tool
# short-circuits the lazy ``synchronize()`` (which would otherwise wait on
# the in-flight future the worker is part of, deadlocking).
_TOOL_DISPATCH_TLS = threading.local()
def _in_tool_dispatch() -> bool:
return getattr(_TOOL_DISPATCH_TLS, "depth", 0) > 0
def _enter_tool_dispatch() -> None:
_TOOL_DISPATCH_TLS.depth = getattr(_TOOL_DISPATCH_TLS, "depth", 0) + 1
def _exit_tool_dispatch() -> None:
_TOOL_DISPATCH_TLS.depth = max(0, getattr(_TOOL_DISPATCH_TLS, "depth", 0) - 1)
def _coerce_sandbox_open_spec(
spec: BackendSandboxSpec | str | None,
) -> BackendSandboxSpec | None:
"""Accept :class:`~rath.backend.abc.BackendSandboxSpec` or a ``working_dir`` path string."""
if spec is None:
return None
if isinstance(spec, str):
return BackendSandboxSpec(working_dir=spec)
return spec
_SESSION_REPR_CHUNK_EDGE = 4
_SESSION_REPR_TEXT_MAX = 256
_SESSION_REPR_TOOL_ARGS_MAX = 160
def _preview_text(s: str, *, max_chars: int = _SESSION_REPR_TEXT_MAX) -> str:
"""Flatten newlines and truncate long strings with a centered `` ... `` gap."""
if not s:
return ""
t = s.replace("\r\n", "\n").replace("\r", "\n").replace("\n", "\\n")
if max_chars <= 8 or len(t) <= max_chars:
return t
edge = max(1, (max_chars - 5) // 2)
return f"{t[:edge]} ... {t[-edge:]}"
def _format_chunk_row(index: int, row: ChunkRow) -> str:
kind = row.kind
p = row.payload
if kind in (ChunkKind.SYSTEM, ChunkKind.USER):
body = _preview_text(str(p.get("content", "")))
return f"[{index}] {kind.value}: {body!r}"
if kind == ChunkKind.ASSISTANT:
parts: list[str] = []
c = p.get("content")
if c is not None and str(c).strip():
parts.append(f"text={_preview_text(str(c))!r}")
tc_raw = p.get("tool_calls") or []
if tc_raw:
names: list[str] = []
for d in tc_raw:
fn = d.get("function") or {}
nm = str(fn.get("name", "?"))
args = _preview_text(
str(fn.get("arguments", "")), max_chars=_SESSION_REPR_TOOL_ARGS_MAX
)
names.append(f"{nm}({args!r})")
parts.append(f"tools=[{', '.join(names)}]")
summary = ", ".join(parts) if parts else "(empty)"
return f"[{index}] {kind.value}: {summary}"
if kind == ChunkKind.TOOL_RESULT:
name = str(p.get("name", ""))
tid = str(p.get("tool_call_id", ""))
body = _preview_text(str(p.get("content", "")))
return f"[{index}] {kind.value}: name={name!r}, id={tid!r}, body={body!r}"
return f"[{index}] {kind.value}: {p!r}"
def _format_chunks_block(rows: tuple[ChunkRow, ...], *, edge: int) -> str:
n = len(rows)
if n == 0:
return "[]"
if n <= 2 * edge + 1:
lines = [_format_chunk_row(i, rows[i]) for i in range(n)]
else:
head = [_format_chunk_row(i, rows[i]) for i in range(edge)]
tail = [_format_chunk_row(i, rows[i]) for i in range(n - edge, n)]
omitted = n - 2 * edge
lines = head + [f"... ({omitted} chunks omitted) ..."] + tail
inner = ",\n ".join(lines)
return "[\n " + inner + "\n ]"
[docs]
class Session:
"""Chunk transcript (:attr:`chunk_table`), optional sandbox, and lineage metadata.
Sandbox placement is **torch-like**: :attr:`sandbox_backend` is ``None`` until
you call :meth:`to` (or :meth:`bind_sandbox`). The handle in :attr:`sandbox` is
opened **lazily** on first use (:meth:`require_sandbox` or entering
``with session:``). Every ``self.sandbox`` slot counts as one reference on the
:class:`~rath.backend.BackendSandbox` instance; :meth:`close_sandbox` drops it,
and the backend ``close`` is called only when the reference count reaches zero.
``with session:`` is optional; when used, the outermost exit calls
:meth:`close_sandbox`.
Lazy materialization: when a session is returned from
:func:`~rath.session.loop.run_session_loop`, it may carry an
in-flight :class:`~rath._async.lazy.LazyValue` in :attr:`_pending`. Reading
:attr:`chunk_table` or :attr:`cumulative_usage` calls :meth:`synchronize`,
which blocks until the runtime publishes the materialized values.
Lineage attributes (:attr:`parent_session_ids`, :attr:`lineage_operator`,
:attr:`lineage_kind`, :attr:`lineage_extras`) are eager and never trigger
synchronize.
Sharing semantics: :func:`~rath.session.loop.run_session_loop`,
:func:`~rath.session.compress.run_session_compress`, :meth:`fork`,
:meth:`detach`, and :meth:`merge` all bind the new session to the **same**
sandbox object as the source (refcount + 1). The source session keeps its
reference. :meth:`detach` differs from :meth:`fork` only in lineage:
:meth:`fork` records ``parent_session_ids=(self.id,)``; :meth:`detach`
records an empty parent tuple. :meth:`merge` always keeps ``self.sandbox``
(the first session's); ``other.sandbox`` is ignored, and ``other`` keeps
its own reference.
Flat lineage (preferred graph substrate): :attr:`parent_session_ids` (ordered
parents), :attr:`lineage_operator`, :attr:`lineage_kind`, :attr:`lineage_extras`.
:attr:`lineage` is an optional legacy DTO tying loop outputs to producer sessions.
"""
__slots__ = (
"_chunk_table",
"id",
"sandbox",
"sandbox_backend",
"_sandbox_open_spec",
"_cm_depth",
"lineage",
"parent_session_ids",
"lineage_operator",
"lineage_kind",
"lineage_extras",
"_cumulative_usage",
"_pending",
"_sync_lock",
"__weakref__",
)
def __init__(
self,
chunk_table: ChunkTable,
*,
id: UUID | None = None,
sandbox: BackendSandbox | None = None,
sandbox_backend: str | None = None,
_sandbox_open_spec: BackendSandboxSpec | None = None,
_cm_depth: int = 0,
lineage: SessionLineage | None = None,
parent_session_ids: tuple[UUID, ...] = (),
lineage_operator: str = "implicit",
lineage_kind: LineageKind = LineageKind.UNKNOWN,
lineage_extras: tuple[tuple[str, Any], ...] = (),
cumulative_usage: RathLLMTokenUsage | None = None,
) -> None:
self._chunk_table = chunk_table
self.id = id if id is not None else uuid4()
self.sandbox = sandbox
self.sandbox_backend = sandbox_backend
self._sandbox_open_spec = _sandbox_open_spec
self._cm_depth = _cm_depth
self.lineage = lineage
self.parent_session_ids = parent_session_ids
self.lineage_operator = lineage_operator
self.lineage_kind = lineage_kind
self.lineage_extras = lineage_extras
self._cumulative_usage = cumulative_usage
self._pending: LazyValue[tuple[ChunkTable, RathLLMTokenUsage | None]] | None = (
None
)
self._sync_lock = threading.Lock()
@property
def chunk_table(self) -> ChunkTable:
"""Materialized transcript. Blocks on ``_pending`` if still in flight.
Re-entrancy: when a tool dispatched by the runtime reads
``session.chunk_table`` from inside its own producing future,
``synchronize()`` would deadlock (the future cannot complete until
the tool returns). In that case we read the in-flight
``_chunk_table`` directly — tools see the transcript as it grows.
"""
if self._pending is not None and not _in_tool_dispatch():
self.synchronize()
return self._chunk_table
@chunk_table.setter
def chunk_table(self, value: ChunkTable) -> None:
# Runtime-internal writers (session.loop._sync_loop_out_rows,
# _async.aloop) call this; setting bypasses the lazy gate because the
# materialization itself is what produces these writes.
self._chunk_table = value
@property
def cumulative_usage(self) -> RathLLMTokenUsage | None:
if self._pending is not None and not _in_tool_dispatch():
self.synchronize()
return self._cumulative_usage
@cumulative_usage.setter
def cumulative_usage(self, value: RathLLMTokenUsage | None) -> None:
self._cumulative_usage = value
[docs]
def synchronize(self) -> Session:
"""Block until ``_pending`` resolves; publish staged values; return self.
Idempotent — repeated calls (including from multiple threads) only
materialize once. Exceptions raised by the in-flight future are
re-raised here (after ``_pending`` is cleared so subsequent reads do
not block again).
"""
# Fast-path read without lock; the slow path re-checks under the lock.
pending = self._pending
if pending is None:
return self
with self._sync_lock:
pending = self._pending
if pending is None:
return self
try:
table, usage = pending.result()
except BaseException:
# Even on failure, mark consumed and drop pending so the next
# access raises the same error from the future, not a hang.
pending.mark_consumed()
self._pending = None
raise
pending.mark_consumed()
# Publish data fields before clearing _pending so readers treat
# `_pending is None` as the happens-before signal.
self._chunk_table = table
self._cumulative_usage = usage
self._pending = None
return self
def _set_pending(
self,
lazy: LazyValue[tuple[ChunkTable, RathLLMTokenUsage | None]],
) -> None:
"""Internal hook used by the runtime to attach an in-flight materialization.
Installs a :func:`weakref.finalize` that fires on session GC and emits
:func:`rath._async.lazy.unraisable_warn` only if the materialization
actually finished with an unobserved exception. Doing this at finalize
time (rather than as a future done-callback) avoids racing with a
``synchronize()`` call that observes the error on the very next line.
"""
self._pending = lazy
from rath._async.lazy import unraisable_warn
weakref.finalize(self, unraisable_warn, lazy, self.id)
@classmethod
def from_agent_prompt(cls, prompt: str) -> Session:
from rath.session.chunk import system_text_chunk
return cls(
chunk_table=ChunkTable(rows=(system_text_chunk(prompt),)),
)
@classmethod
def from_user_message(cls, message: str) -> Session:
from rath.session.chunk import user_text_chunk
return cls(
chunk_table=ChunkTable(rows=(user_text_chunk(message),)),
)
[docs]
@classmethod
def create(cls, kind: str = "user", text: str = "") -> Session:
"""Friendly single-entry constructor with lineage stamping.
``kind`` is one of:
- ``"user"`` — single USER chunk holding ``text``; stamps ``LEAF_USER``.
- ``"system"`` — single SYSTEM chunk holding ``text``; stamps ``LEAF_SYSTEM``.
- ``"empty"`` — zero-row transcript; ``text`` is ignored; no lineage stamp.
The returned session is **unbound** (no sandbox). Chain ``.to(backend)``
to pick a backend; the handle opens lazily on first use or
``with session:``.
"""
from rath.session.chunk import system_text_chunk, user_text_chunk
if kind == "user":
s = cls(chunk_table=ChunkTable(rows=(user_text_chunk(text),)))
LineageRecorder.stamp_new_session(
s,
parent_session_ids=(),
lineage_operator="Session.create",
lineage_kind=LineageKind.LEAF_USER,
lineage_extras=(("source", "Session.create"),),
)
return s
if kind == "system":
s = cls(chunk_table=ChunkTable(rows=(system_text_chunk(text),)))
LineageRecorder.stamp_new_session(
s,
parent_session_ids=(),
lineage_operator="Session.create",
lineage_kind=LineageKind.LEAF_SYSTEM,
lineage_extras=(("source", "Session.create"),),
)
return s
if kind == "empty":
return cls(chunk_table=ChunkTable(rows=()))
raise ValueError(
f"Session.create: unknown kind {kind!r}; "
"expected one of 'user', 'system', 'empty'"
)
[docs]
def to(
self,
backend: str = "local",
*,
spec: BackendSandboxSpec | str | None = None,
) -> Session:
"""Close any current handle, set target backend, and return ``self`` (chainable)."""
self.close_sandbox()
self.sandbox_backend = backend
self._sandbox_open_spec = _coerce_sandbox_open_spec(spec)
return self
[docs]
def close_sandbox(self) -> Session:
"""Drop this session's sandbox reference; close when refcount hits zero."""
if self.sandbox is not None and not self.sandbox.closed:
self.sandbox.release()
self.sandbox = None
return self
def _ensure_sandbox(self) -> None:
if self.sandbox is not None and not self.sandbox.closed:
return
if self.sandbox is not None and self.sandbox.closed:
self.sandbox = None
if self.sandbox_backend is None:
raise RuntimeError(
'session has no sandbox backend; call session.to("local") '
"or session.bind_sandbox(...)"
)
open_spec = _coerce_sandbox_open_spec(self._sandbox_open_spec)
sb = get(self.sandbox_backend).open(open_spec)
sb.acquire()
self.sandbox = sb
def __enter__(self) -> Session:
if self._cm_depth == 0:
self._ensure_sandbox()
self._cm_depth += 1
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
self._cm_depth -= 1
if self._cm_depth == 0:
self.close_sandbox()
def require_sandbox(self) -> BackendSandbox:
if self.sandbox is not None:
if self.sandbox.closed:
raise RuntimeError("session sandbox is closed")
return self.sandbox
if self.sandbox_backend is None:
raise RuntimeError(
'session has no sandbox backend; call session.to("local") '
"or session.bind_sandbox(...)"
)
self._ensure_sandbox()
assert self.sandbox is not None
return self.sandbox
[docs]
def bind_sandbox(self, sandbox: BackendSandbox) -> Session:
"""Take a reference to ``sandbox`` (refcount + 1); release the previous one."""
if self.sandbox is sandbox:
return self
if self.sandbox is not None and not self.sandbox.closed:
self.sandbox.release()
sandbox.acquire()
self.sandbox = sandbox
self.sandbox_backend = sandbox.backend.name
self._sandbox_open_spec = sandbox.spec
return self
[docs]
def fork(self) -> "Session":
"""Duplicate transcript; share the same sandbox reference (refcount + 1)."""
rows = tuple(self.chunk_table.rows)
forked = Session(
chunk_table=ChunkTable(rows=rows),
sandbox_backend=self.sandbox_backend,
_sandbox_open_spec=self._sandbox_open_spec,
)
if self.sandbox is not None and not self.sandbox.closed:
forked.bind_sandbox(self.sandbox)
LineageRecorder.stamp_new_session(
forked,
parent_session_ids=(self.id,),
lineage_operator="Session.fork",
lineage_kind=LineageKind.OP_FORK,
)
return forked
[docs]
def detach(self) -> "Session":
"""Duplicate transcript with a fresh lineage root; share the sandbox reference."""
rows = tuple(self.chunk_table.rows)
detached = Session(
chunk_table=ChunkTable(rows=rows),
sandbox_backend=self.sandbox_backend,
_sandbox_open_spec=self._sandbox_open_spec,
)
if self.sandbox is not None and not self.sandbox.closed:
detached.bind_sandbox(self.sandbox)
LineageRecorder.stamp_new_session(
detached,
parent_session_ids=(),
lineage_operator="Session.detach",
lineage_kind=LineageKind.OP_DETACH,
lineage_extras=(),
)
return detached
[docs]
def merge(self, other: "Session") -> "Session":
"""Concatenate ``self.rows + other.rows`` into a new session.
The merged session **always keeps** ``self.sandbox`` — the first
session's. ``other.sandbox`` is ignored regardless of whether it is
the same instance, a different one, or ``None``; ``other`` keeps its
own reference. Refcount on ``self.sandbox`` is bumped by 1 when set.
:attr:`cumulative_usage` is summed across both inputs. Lineage parents
are ``(self.id, other.id)``, kind is :attr:`LineageKind.OP_MERGE`.
The only remaining hard constraint: when **both** sessions are unbound
and they declare different ``sandbox_backend`` targets, merging is
ambiguous — raises :class:`ValueError`.
"""
if (
self.sandbox is None
and other.sandbox is None
and self.sandbox_backend != other.sandbox_backend
):
raise ValueError(
"cannot merge unbound sessions targeting different backends: "
f"{self.sandbox_backend!r} vs {other.sandbox_backend!r}"
)
merged_rows = self.chunk_table.rows + other.chunk_table.rows
merged_usage = add_usage(self.cumulative_usage, other.cumulative_usage)
merged = Session(
chunk_table=ChunkTable(rows=merged_rows),
sandbox_backend=self.sandbox_backend,
_sandbox_open_spec=self._sandbox_open_spec,
cumulative_usage=merged_usage,
)
if self.sandbox is not None and not self.sandbox.closed:
merged.bind_sandbox(self.sandbox)
LineageRecorder.stamp_new_session(
merged,
parent_session_ids=(self.id, other.id),
lineage_operator="Session.merge",
lineage_kind=LineageKind.OP_MERGE,
)
return merged
def __str__(self) -> str:
cls_name = type(self).__name__
# Avoid triggering synchronize() in repr -- show pending state instead.
if self._pending is not None:
return f"{cls_name}(pending …, operator={self.lineage_operator!r})"
block = _format_chunks_block(
self._chunk_table.rows, edge=_SESSION_REPR_CHUNK_EDGE
)
return (
f"{cls_name}(\n chunks={block},\n operator={self.lineage_operator!r},\n)"
)
def __repr__(self) -> str:
return self.__str__()