rath.session#
Session state, chunk transcript, session loop, lazy materialization, context compression, and lineage graph.
Source#
Module |
Source |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Public contract#
Session#
Field |
Type |
Meaning |
|---|---|---|
|
|
Chronological chunk rows. |
|
|
Session identity. |
|
|
Currently open sandbox handle. |
|
|
Backend name used for lazy open. |
|
|
Lineage parents. |
|
|
Operation that produced the current session. |
|
|
Lineage operation kind. |
|
|
Running token usage accumulated by loop/compress and summed by |
Method |
Returns |
Behavior |
|---|---|---|
|
|
Creates a single |
|
|
Creates a single |
|
|
Sets the sandbox target and releases the current handle (refcount − 1). |
|
|
Releases the current handle and takes a reference on |
|
|
Returns or lazily opens the current sandbox; acquires one reference on first open. |
|
|
Drops this session’s reference; the backend closes when the count reaches zero. |
|
|
Copies chunk rows and shares the sandbox reference (refcount + 1); parent points to the source session. |
|
|
Copies chunk rows and shares the sandbox reference; creates a new lineage root. |
|
|
Concatenates |
Session.merge(...) joins compatible branches, preserves parent lineage, and
keeps sandbox ownership explicit.#
Chunk helpers#
Function |
Returns |
Purpose |
|---|---|---|
|
|
Creates a user row. |
|
|
Creates a system row. |
|
|
Creates an assistant row. |
|
|
Creates a tool result row. |
|
|
Converts to chat completion messages. |
Loop#
run_session_loop(
user_session: Session,
agent_session: Session,
*,
agent_provider: Provider,
tools: list[FlowToolCall] | None = None,
executor: SessionLoopExecutor | None = None,
max_tool_rounds: int = 16,
on_event: Callable[[RathLLMStreamDelta], None] | None = None,
persist: bool = False,
persist_path: Path | None = None,
sandbox_handle_id: str | None = None,
) -> Session
Parameter |
Description |
|---|---|
|
User-side transcript and sandbox placement. |
|
Agent/system transcript used in request assembly. |
|
Model and request parameters. |
|
Additional |
|
Replacement point for completion and tool dispatch. |
|
Maximum number of tool-call rounds. |
|
When set, each streamed |
|
When truthy, every appended chunk is written to |
|
Optional sandbox identifier persisted in the JSONL header for later reattach. |
The output session shares the input user session’s sandbox reference (refcount + 1). The returned Session starts with the user rows, then appends assistant rows and tool_result rows. The output session lineage parents are the user session and agent session.
executor and on_event are mutually exclusive unless the caller manually wraps a streaming client in StreamingExecutor. When on_event is provided, the resolved client must implement complete_stream(req); the built-in OpenAI-compatible and Anthropic clients both support this path in v1.2.
The public API remains synchronous. Internally, v1.2 can return a lazy output session while tool/model work continues on the private async runtime. Session identity, lineage metadata, and sandbox ownership are available immediately; reading chunk_table, reading cumulative_usage, or calling synchronize() waits for pending work.
Compression#
run_session_compress(
user_session: Session,
agent_session: Session,
*,
agent_provider: Provider,
executor: SessionLoopExecutor | None = None,
compress_instruction: str | None = None,
register_sessions: bool = True,
on_event: Callable[[RathLLMStreamDelta], None] | None = None,
persist: bool = False,
persist_path: Path | None = None,
sandbox_handle_id: str | None = None,
) -> Session
Returns a user-only session. The compression request uses tools=None and tool_choice="none". A model response with tool calls raises RuntimeError. on_event / persist behavior matches run_session_loop.
Persistence#
Session persistence writes a header, chunk records, and a trailer to JSONL; a missing trailer marks an interrupted run.#
Async persistence also writes .__partial__ marker files beside in-flight JSONL files. The marker is removed after the trailer is committed, so startup/listing code can distinguish a clean close from a crash or interrupted async write.
API |
Behavior |
|---|---|
|
Opens an append-only JSONL writer and writes a header. |
|
Parses a persisted session JSONL file into |
|
Lists header metadata and closed/crashed state for stored sessions. |
|
Deletes one persisted session file. |
|
Deletes persisted sessions older than a cutoff. |
|
Builds |
Persistence records are newline-delimited JSON:
Record |
Meaning |
|---|---|
|
Session id, lineage, sandbox backend/spec, optional sandbox handle id. |
|
One appended chunk row. Inherited user rows are seeded before new loop output rows. |
|
Graceful close marker plus final cumulative usage. Missing trailer means the process likely crashed mid-write. |
Lineage export#
API |
Behavior |
|---|---|
|
Projects one session into a JSON-ready lineage row. |
|
Returns JSONL for an iterable of sessions. |
|
Writes lineage JSONL to disk. |
|
Resolves |
Exceptions and edge behavior#
Location |
Behavior |
|---|---|
|
Raises |
|
Keeps |
Lazy output reads |
Reading |
|
Raises |
|
Non-JSON tool arguments, unknown tools, and tool execution exceptions are written as JSON error |
|
Empty model content, tool calls, and unexpected finish reasons raise |
Autodoc#
- class rath.session.Session(chunk_table: ChunkTable, *, id: UUID | None = None, sandbox: BackendSandbox | None = None, sandbox_backend: str | None = None, _sandbox_open_spec: BackendSandboxSpec | None = None, _cm_depth: int = 0, lineage: SessionLineage | None = None, parent_session_ids: tuple[UUID, ...] = (), lineage_operator: str = 'implicit', lineage_kind: LineageKind = LineageKind.UNKNOWN, lineage_extras: tuple[tuple[str, Any], ...] = (), cumulative_usage: RathLLMTokenUsage | None = None)[source]#
Chunk transcript (
chunk_table), optional sandbox, and lineage metadata.Sandbox placement is torch-like:
sandbox_backendisNoneuntil you callto()(orbind_sandbox()). The handle insandboxis opened lazily on first use (require_sandbox()or enteringwith session:). Everyself.sandboxslot counts as one reference on theBackendSandboxinstance;close_sandbox()drops it, and the backendcloseis called only when the reference count reaches zero.with session:is optional; when used, the outermost exit callsclose_sandbox().Lazy materialization: when a session is returned from
run_session_loop(), it may carry an in-flightLazyValuein_pending. Readingchunk_tableorcumulative_usagecallssynchronize(), which blocks until the runtime publishes the materialized values. Lineage attributes (parent_session_ids,lineage_operator,lineage_kind,lineage_extras) are eager and never trigger synchronize.Sharing semantics:
run_session_loop(),run_session_compress(),fork(),detach(), andmerge()all bind the new session to the same sandbox object as the source (refcount + 1). The source session keeps its reference.detach()differs fromfork()only in lineage:fork()recordsparent_session_ids=(self.id,);detach()records an empty parent tuple.merge()always keepsself.sandbox(the first session’s);other.sandboxis ignored, andotherkeeps its own reference.Flat lineage (preferred graph substrate):
parent_session_ids(ordered parents),lineage_operator,lineage_kind,lineage_extras.lineageis an optional legacy DTO tying loop outputs to producer sessions.- property chunk_table: ChunkTable#
Materialized transcript. Blocks on
_pendingif still in flight.Re-entrancy: when a tool dispatched by the runtime reads
session.chunk_tablefrom inside its own producing future,synchronize()would deadlock (the future cannot complete until the tool returns). In that case we read the in-flight_chunk_tabledirectly — tools see the transcript as it grows.
- synchronize() Session[source]#
Block until
_pendingresolves; publish staged values; return self.Idempotent — repeated calls (including from multiple threads) only materialize once. Exceptions raised by the in-flight future are re-raised here (after
_pendingis cleared so subsequent reads do not block again).
- classmethod create(kind: str = 'user', text: str = '') Session[source]#
Friendly single-entry constructor with lineage stamping.
kindis one of:"user"— single USER chunk holdingtext; stampsLEAF_USER."system"— single SYSTEM chunk holdingtext; stampsLEAF_SYSTEM."empty"— zero-row transcript;textis ignored; no lineage stamp.
The returned session is unbound (no sandbox). Chain
.to(backend)to pick a backend; the handle opens lazily on first use orwith session:.
- to(backend: str = 'local', *, spec: BackendSandboxSpec | str | None = None) Session[source]#
Close any current handle, set target backend, and return
self(chainable).
- close_sandbox() Session[source]#
Drop this session’s sandbox reference; close when refcount hits zero.
- bind_sandbox(sandbox: BackendSandbox) Session[source]#
Take a reference to
sandbox(refcount + 1); release the previous one.
- detach() Session[source]#
Duplicate transcript with a fresh lineage root; share the sandbox reference.
- merge(other: Session) Session[source]#
Concatenate
self.rows + other.rowsinto a new session.The merged session always keeps
self.sandbox— the first session’s.other.sandboxis ignored regardless of whether it is the same instance, a different one, orNone;otherkeeps its own reference. Refcount onself.sandboxis bumped by 1 when set.cumulative_usageis summed across both inputs. Lineage parents are(self.id, other.id), kind isLineageKind.OP_MERGE.The only remaining hard constraint: when both sessions are unbound and they declare different
sandbox_backendtargets, merging is ambiguous — raisesValueError.
- class rath.session.ChunkRow(kind: ChunkKind, payload: dict[str, Any])[source]#
Immutable row in chronological order.
- class rath.session.ChunkTable(rows: tuple[ChunkRow, ...] = ())[source]#
Append-only chronological chunk list.
- rath.session.run_session_loop(user_session: Session, agent_session: Session, *, agent_provider: Provider, tools: list[FlowToolCall] | None = None, executor: SessionLoopExecutor | None = None, max_tool_rounds: int = 64, on_event: Callable[[RathLLMStreamDelta], None] | None = None, persist: bool = False, persist_path: Path | None = None, sandbox_handle_id: str | None = None, lazy: bool = True) Session[source]#
Run one multi-turn assistant pass with optional tool rounds.
Built-in tools come from
global_system_tools(); pass instantiatedFlowToolCallobjects intoolsto add or override. Shadowing built-in names is disallowed.Shares the
BackendSandboxfromuser_sessionwith the returned session (refcount + 1); the user session keeps its reference and either side canSession.close_sandbox()independently. LLM routing kwargs come fromagent_provider; completions and tool dispatch go throughexecutor(a freshDefaultSessionLoopExecutoris built when omitted).When
on_eventis provided, completions stream — the resolved client must satisfyStreamingChatClient, otherwise aTypeErroris raised before any session is registered. EachRathLLMStreamDeltais forwarded toon_event; chunks are still appended atomically (one accumulated assistant message per round).When
persistis true orpersist_pathis given, every appended row is written to.openrath/sessions/<out.id>.jsonl(or topersist_path). On graceful return the trailer is written; on exception the file is abandoned without a trailer.Message assembly concatenates
agent_session.chunk_tableahead of the user rows for the LLM; head rows stay out ofout.chunk_table.When
lazy=True(the default), the returnedSessionis a lazy handle: lineage attributes,id, andsandboxare set immediately, but the transcript materialises only when the caller readsout.chunk_table(or callsout.synchronize()). The runtime executes the loop on a background asyncio loop so multiplerun_session_loopcalls can overlap.
- class rath.session.SessionLoopExecutor(*args, **kwargs)[source]#
Runs completions and tool dispatch used by
run_session_loop.- complete(req: RathLLMChatRequest) RathLLMChatResponse[source]#
Run one chat completion.
- dispatch_tool(session: Session, tool: FlowToolCall, arguments: Mapping[str, Any]) Any[source]#
Run
toolwith JSONarguments(typicallytool(session, arguments)).
- tool_schemas() tuple[RathLLMFunctionTool, ...][source]#
Tool specs for OpenAI-style
tools. Empty tuple defers to the loop-local merged registry.
- class rath.session.loop.StreamingExecutor(client: StreamingChatClient, on_event: Callable[[RathLLMStreamDelta], None], inner: SessionLoopExecutor | None = None)[source]#
Adapt a
StreamingChatClientto theSessionLoopExecutorprotocol.complete()consumes the client’scomplete_stream(req), forwards each delta toon_event, and returns the accumulated response. Tool dispatch and schema lookup are delegated to an inner executor (a freshDefaultSessionLoopExecutorwrapping the same client when one is not supplied).
- rath.session.run_session_compress(user_session: Session, agent_session: Session, *, agent_provider: Provider, executor: SessionLoopExecutor | None = None, compress_instruction: str | None = None, register_sessions: bool = True, on_event: Callable[[RathLLMStreamDelta], None] | None = None, persist: bool = False, persist_path: Path | None = None, sandbox_handle_id: str | None = None) Session[source]#
Summarize transcript via LLM into a new user-only session (no SYSTEM chunks).
agent_sessionanduser_sessionchunks are folded into the completion request only — they are not copied intoout.chunk_table. The returned session contains a single USER row built from the model reply.Completions use
tools=Noneandtool_choice=none. If the model returns tool calls, raisesRuntimeError.When
executorisNone, a default executor is built fromagent_provider; it must carry a non-emptyapi_key.Shares the
BackendSandboxfromuser_sessionwith the returned session (refcount + 1) when one is bound; the user session keeps its reference.When
on_eventis provided, the completion streams — the resolved client must satisfyStreamingChatClient. EachRathLLMStreamDeltais forwarded toon_event.When
persistis true orpersist_pathis given, the single output row is written to.openrath/sessions/<out.id>.jsonl(or topersist_path) with a trailer.
- rath.session.create_user_session(message: str) Session[source]#
Leaf user transcript; stamps
LEAF_USERwhen lineage mode is on.
- rath.session.create_system_session(prompt: str) Session[source]#
Leaf system transcript; stamps
LEAF_SYSTEMwhen lineage mode is on.
- class rath.session.SessionWriter(session: Session, *, sandbox_handle_id: str | None = None, path: Path | None = None)[source]#
Append-only JSONL WAL writer for one
Session.Usage:
writer = SessionWriter(session) # opens <id>.jsonl.__partial__, # writes header immediately writer.write_chunk(0, row_0) writer.write_chunk(1, row_1) ... writer.close() # writes trailer, renames to # <id>.jsonl atomically
The writer can also be used as a context manager —
__exit__callsclose()when no exception is in flight andabandon()otherwise (so a crash midway leaves the__partial__file behind, marking the session asclosed=Falseon reload).- property path: Path#
Absolute path to the on-disk JSONL file after
close().Note this is the final path. While the writer is still in-flight the file lives at
partial_path; readers that want to look at an in-flight session should use that attribute instead.
- rath.session.load_session(session_id: UUID | str, *, path: Path | None = None) PersistedSession[source]#
Parse one session JSONL into a
PersistedSession.Pass
session_idto look up under the resolved sessions directory, orpathto read an explicit file (mainly for tests). The two are mutually exclusive — when both are given,pathwins.Raises
PersistenceErrorfor malformed JSON, missing header, or schema-version mismatches. A trailing unterminated line is treated as a crashed-mid-write line and silently skipped; the returnedPersistedSession.closedfield will beFalsebecause no trailer record was observed.
- rath.session.list_persisted_sessions() list[PersistedSessionMeta][source]#
Enumerate persisted sessions in the resolved sessions directory.
Sorted by
created_atascending (oldest first). Files that fail to parse are skipped with a logged warning rather than aborting the whole listing.
- rath.session.delete_session(session_id: UUID | str, *, path: Path | None = None) bool[source]#
Remove the on-disk JSONL file for
session_id.Returns
Truewhen the file existed and was removed,Falsewhen it was already absent. Does not touch any associated sandbox dir — pair withPersistentSandboxRegistry.delete_local()when removing the sandbox is also desired.
- rath.session.prune_sessions(*, older_than: timedelta) list[UUID][source]#
Delete persisted sessions whose
created_atis older thanolder_than.Returns the removed session ids in deletion order. Files that fail to parse are skipped (and not pruned — manual cleanup is safer than auto- delete in that case).
- class rath.session.PersistedSession(*, header: PersistedSessionHeader, chunk_table: ChunkTable, cumulative_usage: RathLLMTokenUsage | None, closed: bool, path: Path, trailer_raw: dict[str, Any] | None = None)[source]#
Full round-trip view of one persisted session file.
- to_resumable_pair(*, agent_prompt: str | None = None) tuple[Session, Session][source]#
Build
(user_session, agent_session)ready forrun_session_loop.The user session inherits the persisted chunk_table verbatim (so the loop sees the same transcript). The agent session carries the system prompt (if any) extracted from the persisted history, or
agent_promptif provided to override.Sandbox handling depends on the recorded backend:
opensandboxwith asandbox_handle_id— reattach immediately viaPersistentSandboxRegistry.reattach_remote()so the resumed session targets the same remote container instead of spinning up a fresh one. Performs I/O against the registry index file and the OpenSandbox backend’sattach.Local (or no recorded handle) — keep the spec on the unbound session; the next consumer opens lazily.
- class rath.session.PersistedSessionHeader(*, schema_version: int, id: UUID, created_at: datetime, parent_session_ids: tuple[UUID, ...], lineage_operator: str, lineage_kind: LineageKind, lineage_extras: tuple[tuple[str, Any], ...], sandbox_backend: str | None, sandbox_spec: BackendSandboxSpec | None, sandbox_handle_id: str | None)[source]#
The
record_type=headerline, decoded.
- class rath.session.PersistedSessionMeta(*, id: UUID, path: Path, created_at: datetime, lineage_operator: str, lineage_kind: LineageKind, chunk_count: int, closed: bool)[source]#
Lightweight summary used by
list_persisted_sessions().Reading meta is cheap: only the header (line 1) is parsed, the rest of the file is scanned only to count chunks and detect a trailer.
- exception rath.session.PersistenceError[source]#
Raised when a persisted session file is corrupt or unreadable.
The string carries a human-readable summary including the file path and, where available, the byte offset / line number of the failure. The original
json.JSONDecodeErrororOSErroris chained via__cause__.
- rath.session.graph.export.session_to_jsonl_row(session: Session) dict[str, Any][source]#
Project a
Sessioninto a JSONable dict for one JSONL row.
- rath.session.graph.export.export_jsonl_string(sessions: Iterable[Session]) str[source]#
Return the JSONL text for
sessions(one line per session,\n-terminated).
- rath.session.graph.export.export_jsonl(sessions: Iterable[Session], path: str | Path) None[source]#
Write JSONL for
sessionstopath(UTF-8,\nline endings).
- rath.session.graph.export.export_journal_jsonl(journal: LineageJournal, path: str | Path, *, skip_unknown: bool = True) None[source]#
Resolve
journal.visit_orderthrough the session registry, then export.Sessions that are not in the global registry are silently skipped when
skip_unknownis true (the default - this matches the typical use case where the journal outlives some sessions). Setskip_unknown=Falseto raiseKeyErrorinstead.