Source code for rath.session.chunk

"""Chunk rows for conversation content on each Session."""

from __future__ import annotations

import json
from dataclasses import dataclass
from enum import Enum
from typing import Any, Mapping, cast

from rath.llm import RathLLMMessage, RathLLMToolCallPart


class ChunkKind(str, Enum):
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"
    TOOL_RESULT = "tool_result"


[docs] @dataclass(frozen=True, slots=True) class ChunkRow: """Immutable row in chronological order.""" kind: ChunkKind payload: dict[str, Any]
def user_text_chunk(text: str) -> ChunkRow: """User message row for :attr:`ChunkKind.USER`.""" return ChunkRow(kind=ChunkKind.USER, payload={"content": text}) def system_text_chunk(text: str) -> ChunkRow: """System prompt row for :attr:`ChunkKind.SYSTEM`.""" return ChunkRow(kind=ChunkKind.SYSTEM, payload={"content": text}) def assistant_turn_chunk( *, tool_calls: tuple[RathLLMToolCallPart, ...] | None, content: str | None = None, ) -> ChunkRow: """Assistant message row; ``tool_calls`` are stored in OpenAI-style wire form.""" wire: list[dict[str, Any]] | None = None if tool_calls: wire = [] for p in tool_calls: wire.append( { "id": p.id, "type": p.type, "function": { "name": p.function.name, "arguments": p.function.arguments, }, } ) return ChunkRow( kind=ChunkKind.ASSISTANT, payload={"content": content, "tool_calls": wire}, ) def tool_feedback_chunk(tool_call_id: str, name: str, body: str) -> ChunkRow: """Tool result chunk for replay into the chat transcript.""" return ChunkRow( kind=ChunkKind.TOOL_RESULT, payload={"tool_call_id": tool_call_id, "name": name, "content": body}, )
[docs] @dataclass(frozen=True, slots=True) class ChunkTable: """Append-only chronological chunk list.""" rows: tuple[ChunkRow, ...] = () def extend(self, *additional: ChunkRow) -> ChunkTable: return ChunkTable(rows=self.rows + tuple(additional))
def _preview_brief(s: str, *, max_chars: int = 256) -> str: """Truncate long single-line previews (for logging / chunk hooks).""" if not s: return "" t = s.replace("\r\n", "\n").replace("\r", "\n").replace("\n", "\\n") if max_chars <= 8 or len(t) <= max_chars: return t edge = max(1, (max_chars - 5) // 2) return f"{t[:edge]} ... {t[-edge:]}" def _tool_result_body_preview(raw: str, *, max_chars: int) -> str: """Decode JSON tool payloads and re-encode with real Unicode (not ``\\u`` escapes).""" t = raw.strip() if not t: return "" try: parsed: Any = json.loads(t) except json.JSONDecodeError: return _preview_brief(raw, max_chars=max_chars) try: normalized = json.dumps( parsed, ensure_ascii=False, separators=(",", ":"), ) except (TypeError, ValueError): return _preview_brief(raw, max_chars=max_chars) return _preview_brief(normalized, max_chars=max_chars) def format_chunk_row_brief(index: int, row: ChunkRow, *, max_payload: int = 400) -> str: """Single-line description of one chunk row (debugging / logging helper).""" kind = row.kind.value p = row.payload if row.kind in (ChunkKind.SYSTEM, ChunkKind.USER): body = _preview_brief(str(p.get("content", "")), max_chars=max_payload) return f"[{index}] {kind}: {body!r}" if row.kind == ChunkKind.ASSISTANT: parts: list[str] = [] c = p.get("content") if c is not None and str(c).strip(): parts.append(f"text={_preview_brief(str(c), max_chars=max_payload)!r}") tc_raw = p.get("tool_calls") or [] if tc_raw: names: list[str] = [] for d in tc_raw: fn = d.get("function") or {} names.append(str(fn.get("name", "?"))) parts.append(f"tools=[{', '.join(names)}]") summary = ", ".join(parts) if parts else "(empty)" return f"[{index}] {kind}: {summary}" if row.kind == ChunkKind.TOOL_RESULT: name = str(p.get("name", "")) body = _tool_result_body_preview( str(p.get("content", "")), max_chars=max_payload ) return f"[{index}] {kind}: name={name!r} body={body}" return f"[{index}] {kind}: {p!r}" def chunk_table_to_messages(tab: ChunkTable) -> tuple[RathLLMMessage, ...]: """Flatten chunk history into Rath LLM wire messages.""" msgs: list[RathLLMMessage] = [] for row in tab.rows: k = row.kind p = row.payload if k == ChunkKind.SYSTEM: msgs.append(RathLLMMessage(role="system", content=str(p["content"]))) elif k == ChunkKind.USER: msgs.append(RathLLMMessage(role="user", content=str(p["content"]))) elif k == ChunkKind.ASSISTANT: content_val = p.get("content") content = None if content_val is None else str(content_val) tc_raw = p.get("tool_calls") tc_tuple: tuple[Mapping[str, Any], ...] | None = None if tc_raw: lst = [cast(Mapping[str, Any], dict(d)) for d in tc_raw] tc_tuple = tuple(lst) msgs.append( RathLLMMessage( role="assistant", content=content, tool_calls=tc_tuple, ) ) elif k == ChunkKind.TOOL_RESULT: msgs.append( RathLLMMessage( role="tool", content=str(p["content"]), tool_call_id=str(p["tool_call_id"]), ) ) return tuple(msgs)