rath.session#

Session state, chunk transcript, session loop, lazy materialization, context compression, and lineage graph.

Source#

Module

Source

rath.session.session

src/rath/session/session.py

rath.session.chunk

src/rath/session/chunk.py

rath.session.loop

src/rath/session/loop.py

rath.session.compress

src/rath/session/compress.py

rath.session.primitives

src/rath/session/primitives.py

rath.session.graph

src/rath/session/graph/

rath.session.manager

src/rath/session/manager.py

rath.session.persistence

src/rath/session/persistence/

Public contract#

Session#

Field

Type

Meaning

chunk_table

ChunkTable

Chronological chunk rows.

id

UUID

Session identity.

sandbox

BackendSandbox | None

Currently open sandbox handle.

sandbox_backend

str | None

Backend name used for lazy open.

parent_session_ids

tuple[UUID, ...]

Lineage parents.

lineage_operator

str

Operation that produced the current session.

lineage_kind

LineageKind

Lineage operation kind.

cumulative_usage

RathLLMTokenUsage | None

Running token usage accumulated by loop/compress and summed by merge(); reading it may synchronize a lazy output session.

Method

Returns

Behavior

Session.from_agent_prompt(prompt)

Session

Creates a single system chunk.

Session.from_user_message(text)

Session

Creates a single user chunk.

session.to(backend="local", spec=None)

Session

Sets the sandbox target and releases the current handle (refcount − 1).

session.bind_sandbox(sandbox)

Session

Releases the current handle and takes a reference on sandbox (refcount + 1).

session.require_sandbox()

BackendSandbox

Returns or lazily opens the current sandbox; acquires one reference on first open.

session.close_sandbox()

Session

Drops this session’s reference; the backend closes when the count reaches zero.

session.fork()

Session

Copies chunk rows and shares the sandbox reference (refcount + 1); parent points to the source session.

session.detach()

Session

Copies chunk rows and shares the sandbox reference; creates a new lineage root.

session.merge(other)

Session

Concatenates self.rows + other.rows, keeps self.sandbox, ignores other.sandbox, and sums cumulative_usage. It only rejects conflicting unbound backend targets.

Session merge lineage

Session.merge(...) joins compatible branches, preserves parent lineage, and keeps sandbox ownership explicit.#

Chunk helpers#

Function

Returns

Purpose

user_text_chunk(text)

ChunkRow

Creates a user row.

system_text_chunk(text)

ChunkRow

Creates a system row.

assistant_turn_chunk(tool_calls, content=None)

ChunkRow

Creates an assistant row.

tool_feedback_chunk(tool_call_id, name, body)

ChunkRow

Creates a tool result row.

chunk_table_to_messages(tab)

tuple[RathLLMMessage, ...]

Converts to chat completion messages.

Loop#

run_session_loop(
    user_session: Session,
    agent_session: Session,
    *,
    agent_provider: Provider,
    tools: list[FlowToolCall] | None = None,
    executor: SessionLoopExecutor | None = None,
    max_tool_rounds: int = 16,
    on_event: Callable[[RathLLMStreamDelta], None] | None = None,
    persist: bool = False,
    persist_path: Path | None = None,
    sandbox_handle_id: str | None = None,
) -> Session

Parameter

Description

user_session

User-side transcript and sandbox placement.

agent_session

Agent/system transcript used in request assembly.

agent_provider

Model and request parameters.

tools

Additional FlowToolCall instances.

executor

Replacement point for completion and tool dispatch.

max_tool_rounds

Maximum number of tool-call rounds.

on_event

When set, each streamed RathLLMStreamDelta is forwarded here; the resolved chat client must satisfy StreamingChatClient.

persist / persist_path

When truthy, every appended chunk is written to .openrath/sessions/<out.id>.jsonl (or the explicit path).

sandbox_handle_id

Optional sandbox identifier persisted in the JSONL header for later reattach.

The output session shares the input user session’s sandbox reference (refcount + 1). The returned Session starts with the user rows, then appends assistant rows and tool_result rows. The output session lineage parents are the user session and agent session.

executor and on_event are mutually exclusive unless the caller manually wraps a streaming client in StreamingExecutor. When on_event is provided, the resolved client must implement complete_stream(req); the built-in OpenAI-compatible and Anthropic clients both support this path in v1.2.

The public API remains synchronous. Internally, v1.2 can return a lazy output session while tool/model work continues on the private async runtime. Session identity, lineage metadata, and sandbox ownership are available immediately; reading chunk_table, reading cumulative_usage, or calling synchronize() waits for pending work.

Compression#

run_session_compress(
    user_session: Session,
    agent_session: Session,
    *,
    agent_provider: Provider,
    executor: SessionLoopExecutor | None = None,
    compress_instruction: str | None = None,
    register_sessions: bool = True,
    on_event: Callable[[RathLLMStreamDelta], None] | None = None,
    persist: bool = False,
    persist_path: Path | None = None,
    sandbox_handle_id: str | None = None,
) -> Session

Returns a user-only session. The compression request uses tools=None and tool_choice="none". A model response with tool calls raises RuntimeError. on_event / persist behavior matches run_session_loop.

Persistence#

Append-only session persistence JSONL

Session persistence writes a header, chunk records, and a trailer to JSONL; a missing trailer marks an interrupted run.#

Async persistence also writes .__partial__ marker files beside in-flight JSONL files. The marker is removed after the trailer is committed, so startup/listing code can distinguish a clean close from a crash or interrupted async write.

API

Behavior

SessionWriter(session, path=None, sandbox_handle_id=None)

Opens an append-only JSONL writer and writes a header.

load_session(id, path=None)

Parses a persisted session JSONL file into PersistedSession.

list_persisted_sessions()

Lists header metadata and closed/crashed state for stored sessions.

delete_session(id)

Deletes one persisted session file.

prune_sessions(older_than=...)

Deletes persisted sessions older than a cutoff.

PersistedSession.to_resumable_pair(agent_prompt=None)

Builds (user_session, agent_session) for another loop call and reattaches the persisted sandbox identity when possible.

Persistence records are newline-delimited JSON:

Record

Meaning

header

Session id, lineage, sandbox backend/spec, optional sandbox handle id.

chunk

One appended chunk row. Inherited user rows are seeded before new loop output rows.

trailer

Graceful close marker plus final cumulative usage. Missing trailer means the process likely crashed mid-write.

Lineage export#

API

Behavior

session_to_jsonl_row(session)

Projects one session into a JSON-ready lineage row.

export_jsonl_string(sessions)

Returns JSONL for an iterable of sessions.

export_jsonl(sessions, path)

Writes lineage JSONL to disk.

export_journal_jsonl(journal, path, skip_unknown=True)

Resolves LineageJournal.visit_order through the registry and exports rows.

Exceptions and edge behavior#

Location

Behavior

Session.require_sandbox()

Raises RuntimeError when no backend target is set.

Session.merge(other)

Keeps self.sandbox even when other points at another open sandbox; raises ValueError only when both sessions are unbound and target different backend names.

Lazy output reads

Reading chunk_table or cumulative_usage blocks until the private async runtime has materialized pending rows.

run_session_loop(on_event=...)

Raises TypeError upfront when the resolved chat client does not implement complete_stream(req).

run_session_loop(...)

Non-JSON tool arguments, unknown tools, and tool execution exceptions are written as JSON error tool_result rows.

run_session_compress(...)

Empty model content, tool calls, and unexpected finish reasons raise RuntimeError.

Autodoc#

class rath.session.Session(chunk_table: ChunkTable, *, id: UUID | None = None, sandbox: BackendSandbox | None = None, sandbox_backend: str | None = None, _sandbox_open_spec: BackendSandboxSpec | None = None, _cm_depth: int = 0, lineage: SessionLineage | None = None, parent_session_ids: tuple[UUID, ...] = (), lineage_operator: str = 'implicit', lineage_kind: LineageKind = LineageKind.UNKNOWN, lineage_extras: tuple[tuple[str, Any], ...] = (), cumulative_usage: RathLLMTokenUsage | None = None)[source]#

Chunk transcript (chunk_table), optional sandbox, and lineage metadata.

Sandbox placement is torch-like: sandbox_backend is None until you call to() (or bind_sandbox()). The handle in sandbox is opened lazily on first use (require_sandbox() or entering with session:). Every self.sandbox slot counts as one reference on the BackendSandbox instance; close_sandbox() drops it, and the backend close is called only when the reference count reaches zero. with session: is optional; when used, the outermost exit calls close_sandbox().

Lazy materialization: when a session is returned from run_session_loop(), it may carry an in-flight LazyValue in _pending. Reading chunk_table or cumulative_usage calls synchronize(), which blocks until the runtime publishes the materialized values. Lineage attributes (parent_session_ids, lineage_operator, lineage_kind, lineage_extras) are eager and never trigger synchronize.

Sharing semantics: run_session_loop(), run_session_compress(), fork(), detach(), and merge() all bind the new session to the same sandbox object as the source (refcount + 1). The source session keeps its reference. detach() differs from fork() only in lineage: fork() records parent_session_ids=(self.id,); detach() records an empty parent tuple. merge() always keeps self.sandbox (the first session’s); other.sandbox is ignored, and other keeps its own reference.

Flat lineage (preferred graph substrate): parent_session_ids (ordered parents), lineage_operator, lineage_kind, lineage_extras. lineage is an optional legacy DTO tying loop outputs to producer sessions.

property chunk_table: ChunkTable#

Materialized transcript. Blocks on _pending if still in flight.

Re-entrancy: when a tool dispatched by the runtime reads session.chunk_table from inside its own producing future, synchronize() would deadlock (the future cannot complete until the tool returns). In that case we read the in-flight _chunk_table directly — tools see the transcript as it grows.

synchronize() Session[source]#

Block until _pending resolves; publish staged values; return self.

Idempotent — repeated calls (including from multiple threads) only materialize once. Exceptions raised by the in-flight future are re-raised here (after _pending is cleared so subsequent reads do not block again).

classmethod create(kind: str = 'user', text: str = '') Session[source]#

Friendly single-entry constructor with lineage stamping.

kind is one of:

  • "user" — single USER chunk holding text; stamps LEAF_USER.

  • "system" — single SYSTEM chunk holding text; stamps LEAF_SYSTEM.

  • "empty" — zero-row transcript; text is ignored; no lineage stamp.

The returned session is unbound (no sandbox). Chain .to(backend) to pick a backend; the handle opens lazily on first use or with session:.

to(backend: str = 'local', *, spec: BackendSandboxSpec | str | None = None) Session[source]#

Close any current handle, set target backend, and return self (chainable).

close_sandbox() Session[source]#

Drop this session’s sandbox reference; close when refcount hits zero.

bind_sandbox(sandbox: BackendSandbox) Session[source]#

Take a reference to sandbox (refcount + 1); release the previous one.

fork() Session[source]#

Duplicate transcript; share the same sandbox reference (refcount + 1).

detach() Session[source]#

Duplicate transcript with a fresh lineage root; share the sandbox reference.

merge(other: Session) Session[source]#

Concatenate self.rows + other.rows into a new session.

The merged session always keeps self.sandbox — the first session’s. other.sandbox is ignored regardless of whether it is the same instance, a different one, or None; other keeps its own reference. Refcount on self.sandbox is bumped by 1 when set. cumulative_usage is summed across both inputs. Lineage parents are (self.id, other.id), kind is LineageKind.OP_MERGE.

The only remaining hard constraint: when both sessions are unbound and they declare different sandbox_backend targets, merging is ambiguous — raises ValueError.

class rath.session.ChunkRow(kind: ChunkKind, payload: dict[str, Any])[source]#

Immutable row in chronological order.

class rath.session.ChunkTable(rows: tuple[ChunkRow, ...] = ())[source]#

Append-only chronological chunk list.

rath.session.run_session_loop(user_session: Session, agent_session: Session, *, agent_provider: Provider, tools: list[FlowToolCall] | None = None, executor: SessionLoopExecutor | None = None, max_tool_rounds: int = 64, on_event: Callable[[RathLLMStreamDelta], None] | None = None, persist: bool = False, persist_path: Path | None = None, sandbox_handle_id: str | None = None, lazy: bool = True) Session[source]#

Run one multi-turn assistant pass with optional tool rounds.

Built-in tools come from global_system_tools(); pass instantiated FlowToolCall objects in tools to add or override. Shadowing built-in names is disallowed.

Shares the BackendSandbox from user_session with the returned session (refcount + 1); the user session keeps its reference and either side can Session.close_sandbox() independently. LLM routing kwargs come from agent_provider; completions and tool dispatch go through executor (a fresh DefaultSessionLoopExecutor is built when omitted).

When on_event is provided, completions stream — the resolved client must satisfy StreamingChatClient, otherwise a TypeError is raised before any session is registered. Each RathLLMStreamDelta is forwarded to on_event; chunks are still appended atomically (one accumulated assistant message per round).

When persist is true or persist_path is given, every appended row is written to .openrath/sessions/<out.id>.jsonl (or to persist_path). On graceful return the trailer is written; on exception the file is abandoned without a trailer.

Message assembly concatenates agent_session.chunk_table ahead of the user rows for the LLM; head rows stay out of out.chunk_table.

When lazy=True (the default), the returned Session is a lazy handle: lineage attributes, id, and sandbox are set immediately, but the transcript materialises only when the caller reads out.chunk_table (or calls out.synchronize()). The runtime executes the loop on a background asyncio loop so multiple run_session_loop calls can overlap.

class rath.session.SessionLoopExecutor(*args, **kwargs)[source]#

Runs completions and tool dispatch used by run_session_loop.

complete(req: RathLLMChatRequest) RathLLMChatResponse[source]#

Run one chat completion.

dispatch_tool(session: Session, tool: FlowToolCall, arguments: Mapping[str, Any]) Any[source]#

Run tool with JSON arguments (typically tool(session, arguments)).

tool_schemas() tuple[RathLLMFunctionTool, ...][source]#

Tool specs for OpenAI-style tools. Empty tuple defers to the loop-local merged registry.

class rath.session.loop.StreamingExecutor(client: StreamingChatClient, on_event: Callable[[RathLLMStreamDelta], None], inner: SessionLoopExecutor | None = None)[source]#

Adapt a StreamingChatClient to the SessionLoopExecutor protocol.

complete() consumes the client’s complete_stream(req), forwards each delta to on_event, and returns the accumulated response. Tool dispatch and schema lookup are delegated to an inner executor (a fresh DefaultSessionLoopExecutor wrapping the same client when one is not supplied).

rath.session.run_session_compress(user_session: Session, agent_session: Session, *, agent_provider: Provider, executor: SessionLoopExecutor | None = None, compress_instruction: str | None = None, register_sessions: bool = True, on_event: Callable[[RathLLMStreamDelta], None] | None = None, persist: bool = False, persist_path: Path | None = None, sandbox_handle_id: str | None = None) Session[source]#

Summarize transcript via LLM into a new user-only session (no SYSTEM chunks).

agent_session and user_session chunks are folded into the completion request only — they are not copied into out.chunk_table. The returned session contains a single USER row built from the model reply.

Completions use tools=None and tool_choice=none. If the model returns tool calls, raises RuntimeError.

When executor is None, a default executor is built from agent_provider; it must carry a non-empty api_key.

Shares the BackendSandbox from user_session with the returned session (refcount + 1) when one is bound; the user session keeps its reference.

When on_event is provided, the completion streams — the resolved client must satisfy StreamingChatClient. Each RathLLMStreamDelta is forwarded to on_event.

When persist is true or persist_path is given, the single output row is written to .openrath/sessions/<out.id>.jsonl (or to persist_path) with a trailer.

rath.session.create_user_session(message: str) Session[source]#

Leaf user transcript; stamps LEAF_USER when lineage mode is on.

rath.session.create_system_session(prompt: str) Session[source]#

Leaf system transcript; stamps LEAF_SYSTEM when lineage mode is on.

rath.session.fork_session(from_session: Session) Session[source]#

Same as fork().

rath.session.detach_session(from_session: Session) Session[source]#

Same as detach().

class rath.session.SessionWriter(session: Session, *, sandbox_handle_id: str | None = None, path: Path | None = None)[source]#

Append-only JSONL WAL writer for one Session.

Usage:

writer = SessionWriter(session)        # opens <id>.jsonl.__partial__,
                                       # writes header immediately
writer.write_chunk(0, row_0)
writer.write_chunk(1, row_1)
...
writer.close()                          # writes trailer, renames to
                                        # <id>.jsonl atomically

The writer can also be used as a context manager — __exit__ calls close() when no exception is in flight and abandon() otherwise (so a crash midway leaves the __partial__ file behind, marking the session as closed=False on reload).

property path: Path#

Absolute path to the on-disk JSONL file after close().

Note this is the final path. While the writer is still in-flight the file lives at partial_path; readers that want to look at an in-flight session should use that attribute instead.

property partial_path: Path#

Absolute path to the in-flight .__partial__ file.

property chunks_written: int#

Number of record_type=chunk lines flushed so far.

write_chunk(index: int, row: ChunkRow) None[source]#

Append one chunk record.

close() None[source]#

Write the trailer and atomically rename __partial__ → final.

Idempotent.

abandon() None[source]#

Release the file handle WITHOUT writing a trailer or renaming.

Leaves the __partial__ file in place — a visible signal that the writing process crashed mid-session or that the runtime drain timed out. The loader still tolerates this file via load_session().

rath.session.load_session(session_id: UUID | str, *, path: Path | None = None) PersistedSession[source]#

Parse one session JSONL into a PersistedSession.

Pass session_id to look up under the resolved sessions directory, or path to read an explicit file (mainly for tests). The two are mutually exclusive — when both are given, path wins.

Raises PersistenceError for malformed JSON, missing header, or schema-version mismatches. A trailing unterminated line is treated as a crashed-mid-write line and silently skipped; the returned PersistedSession.closed field will be False because no trailer record was observed.

rath.session.list_persisted_sessions() list[PersistedSessionMeta][source]#

Enumerate persisted sessions in the resolved sessions directory.

Sorted by created_at ascending (oldest first). Files that fail to parse are skipped with a logged warning rather than aborting the whole listing.

rath.session.delete_session(session_id: UUID | str, *, path: Path | None = None) bool[source]#

Remove the on-disk JSONL file for session_id.

Returns True when the file existed and was removed, False when it was already absent. Does not touch any associated sandbox dir — pair with PersistentSandboxRegistry.delete_local() when removing the sandbox is also desired.

rath.session.prune_sessions(*, older_than: timedelta) list[UUID][source]#

Delete persisted sessions whose created_at is older than older_than.

Returns the removed session ids in deletion order. Files that fail to parse are skipped (and not pruned — manual cleanup is safer than auto- delete in that case).

class rath.session.PersistedSession(*, header: PersistedSessionHeader, chunk_table: ChunkTable, cumulative_usage: RathLLMTokenUsage | None, closed: bool, path: Path, trailer_raw: dict[str, Any] | None = None)[source]#

Full round-trip view of one persisted session file.

to_resumable_pair(*, agent_prompt: str | None = None) tuple[Session, Session][source]#

Build (user_session, agent_session) ready for run_session_loop.

The user session inherits the persisted chunk_table verbatim (so the loop sees the same transcript). The agent session carries the system prompt (if any) extracted from the persisted history, or agent_prompt if provided to override.

Sandbox handling depends on the recorded backend:

  • opensandbox with a sandbox_handle_id — reattach immediately via PersistentSandboxRegistry.reattach_remote() so the resumed session targets the same remote container instead of spinning up a fresh one. Performs I/O against the registry index file and the OpenSandbox backend’s attach.

  • Local (or no recorded handle) — keep the spec on the unbound session; the next consumer opens lazily.

class rath.session.PersistedSessionHeader(*, schema_version: int, id: UUID, created_at: datetime, parent_session_ids: tuple[UUID, ...], lineage_operator: str, lineage_kind: LineageKind, lineage_extras: tuple[tuple[str, Any], ...], sandbox_backend: str | None, sandbox_spec: BackendSandboxSpec | None, sandbox_handle_id: str | None)[source]#

The record_type=header line, decoded.

class rath.session.PersistedSessionMeta(*, id: UUID, path: Path, created_at: datetime, lineage_operator: str, lineage_kind: LineageKind, chunk_count: int, closed: bool)[source]#

Lightweight summary used by list_persisted_sessions().

Reading meta is cheap: only the header (line 1) is parsed, the rest of the file is scanned only to count chunks and detect a trailer.

exception rath.session.PersistenceError[source]#

Raised when a persisted session file is corrupt or unreadable.

The string carries a human-readable summary including the file path and, where available, the byte offset / line number of the failure. The original json.JSONDecodeError or OSError is chained via __cause__.

rath.session.graph.export.session_to_jsonl_row(session: Session) dict[str, Any][source]#

Project a Session into a JSONable dict for one JSONL row.

rath.session.graph.export.export_jsonl_string(sessions: Iterable[Session]) str[source]#

Return the JSONL text for sessions (one line per session, \n-terminated).

rath.session.graph.export.export_jsonl(sessions: Iterable[Session], path: str | Path) None[source]#

Write JSONL for sessions to path (UTF-8, \n line endings).

rath.session.graph.export.export_journal_jsonl(journal: LineageJournal, path: str | Path, *, skip_unknown: bool = True) None[source]#

Resolve journal.visit_order through the session registry, then export.

Sessions that are not in the global registry are silently skipped when skip_unknown is true (the default - this matches the typical use case where the journal outlives some sessions). Set skip_unknown=False to raise KeyError instead.

← API Reference