Source code for rath.session.graph.export
"""JSONL (newline-delimited JSON) export for session lineage graphs.
One Session per line. Edges are not materialized - ``parent_session_ids`` on
each row implies them - so the format is friendly to ``jq``, streaming
parsers, and naive Mermaid converters. Pair with
:class:`~rath.session.graph.LineageJournal` to dump only the sessions
visited inside a given block.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Iterable
from rath.session.graph.recording import LineageJournal
from rath.session.manager import session_registry
from rath.session.session import Session
__all__ = [
"session_to_jsonl_row",
"export_jsonl_string",
"export_jsonl",
"export_journal_jsonl",
"cumulative_usage_to_jsonable",
"lineage_extras_to_jsonable",
]
def cumulative_usage_to_jsonable(session: Session) -> dict[str, int] | None:
"""Project ``session.cumulative_usage`` into a JSON-ready dict (or ``None``).
Reads ``_cumulative_usage`` directly so persistence-layer callers running
inside an in-flight materialization (where ``session._pending`` is still
set) don't recursively trip :meth:`Session.synchronize` on themselves.
"""
usage = session._cumulative_usage
if usage is None:
return None
return {
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens,
}
def lineage_extras_to_jsonable(
extras: tuple[tuple[str, Any], ...],
) -> list[list[Any]]:
"""Convert ``lineage_extras`` into a JSONable list-of-pairs.
Values that are not natively JSON-serializable are coerced to ``str(value)``
so the row never fails to dump.
"""
out: list[list[Any]] = []
for key, value in extras:
try:
json.dumps(value)
jvalue = value
except (TypeError, ValueError):
jvalue = str(value)
out.append([str(key), jvalue])
return out
[docs]
def session_to_jsonl_row(session: Session) -> dict[str, Any]:
"""Project a :class:`Session` into a JSONable dict for one JSONL row."""
return {
"id": str(session.id),
"parent_session_ids": [str(p) for p in session.parent_session_ids],
"lineage_operator": session.lineage_operator,
"lineage_kind": session.lineage_kind.value,
"lineage_extras": lineage_extras_to_jsonable(session.lineage_extras),
"chunk_count": len(session.chunk_table.rows),
"cumulative_usage": cumulative_usage_to_jsonable(session),
}
[docs]
def export_jsonl_string(sessions: Iterable[Session]) -> str:
"""Return the JSONL text for ``sessions`` (one line per session, ``\\n``-terminated)."""
parts: list[str] = []
for s in sessions:
parts.append(json.dumps(session_to_jsonl_row(s), ensure_ascii=False))
return "\n".join(parts) + ("\n" if parts else "")
[docs]
def export_jsonl(sessions: Iterable[Session], path: str | Path) -> None:
"""Write JSONL for ``sessions`` to ``path`` (UTF-8, ``\\n`` line endings)."""
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
text = export_jsonl_string(sessions)
p.write_text(text, encoding="utf-8")
[docs]
def export_journal_jsonl(
journal: LineageJournal,
path: str | Path,
*,
skip_unknown: bool = True,
) -> None:
"""Resolve ``journal.visit_order`` through the session registry, then export.
Sessions that are not in the global registry are silently skipped when
``skip_unknown`` is true (the default - this matches the typical use case
where the journal outlives some sessions). Set ``skip_unknown=False`` to
raise :class:`KeyError` instead.
"""
reg = session_registry()
rows: list[Session] = []
for sid in journal.visit_order:
s = reg.get(sid)
if s is None:
if skip_unknown:
continue
raise KeyError(f"session {sid} not in registry")
rows.append(s)
export_jsonl(rows, path)