from __future__ import annotations
from collections.abc import Callable
from dataclasses import replace
from typing import Union
from rath.flow.agent_param import AgentParam
from rath.flow.memory_inject import DefaultRecallInjection, MemoryInjectionPolicy
from rath.flow.tool import FlowToolCall
from rath.flow.workflow import Workflow
from rath.llm import RathLLMStreamDelta
from rath.llm.provider import Provider
from rath.memory import current as _current_memory_backend
from rath.memory import get as _get_memory_backend
from rath.memory.abc import MemoryStore, MemoryStoreSpec
from rath.session import Session, run_session_loop
MemoryArg = Union[MemoryStore, MemoryStoreSpec, str, None]
def _resolve_memory(memory: MemoryArg) -> MemoryStore | None:
"""Resolve any of the accepted ``memory=`` forms to an open store.
Acquires one refcount on the returned store; the caller — usually
:class:`Agent` — is responsible for releasing it via
:meth:`MemoryStore.release` (or its own ``close()``).
"""
if memory is None:
return None
if isinstance(memory, MemoryStore):
return memory.acquire()
if isinstance(memory, str):
from rath.config.store import ConfigStore
cfg = ConfigStore.load()
if memory in cfg.config.memory.providers:
spec = MemoryStoreSpec.from_config(memory, store=cfg)
kind = cfg.get_memory_provider(memory).backend_kind
backend = _get_memory_backend(kind)
return backend.open(spec).acquire()
backend = _get_memory_backend(memory)
return backend.open().acquire()
if isinstance(memory, MemoryStoreSpec):
backend = _current_memory_backend()
return backend.open(memory).acquire()
raise TypeError(
f"memory must be a MemoryStore, MemoryStoreSpec, backend name, or None; got {type(memory).__name__}"
)
[docs]
class Agent(Workflow):
def __init__(
self,
system_prompt: str,
provider: Provider | None = None,
tools: list[FlowToolCall] | None = None,
*,
model: str | None = None,
on_event: Callable[[RathLLMStreamDelta], None] | None = None,
memory: MemoryArg = None,
memory_inject: MemoryInjectionPolicy | None = None,
commit_on_forward: bool = False,
):
"""Build a single-agent workflow.
``provider`` may be a fully-formed :class:`~rath.llm.Provider` (the
explicit form used when you want to control api_key, base_url,
sampling, etc.) or omitted in favor of the shortcut ``model="..."``
kwarg, which constructs a :class:`Provider` with just that model name.
``api_key`` and ``base_url`` left empty on the Provider fall back to
the ``OPENAI_API_KEY`` / ``OPENAI_BASE_URL`` environment variables,
then to ``llm.default_provider`` in ``~/.openrath/config.json``
(see :mod:`rath.config`), inside
:class:`~rath.llm.RathOpenAIChatClient`, so::
flow.Agent("Use tools when helpful.", model="gpt-5.5")
is the minimal form that works once the environment is configured.
``on_event`` enables streaming: each forward pass invokes the loop
with the callback wired up. The resolved chat client must satisfy
:class:`~rath.llm.StreamingChatClient`.
"""
super().__init__()
if provider is None and model is None:
raise ValueError(
'flow.Agent requires either provider=Provider(...) or model="..."',
)
if provider is None:
provider = Provider(model=model)
elif model is not None and provider.model is None:
provider = replace(provider, model=model)
self.tools = list(tools or [])
self._on_event = on_event
self._memory_inject: MemoryInjectionPolicy = (
memory_inject or DefaultRecallInjection()
)
self._commit_on_forward = commit_on_forward
resolved_memory = _resolve_memory(memory)
self.memory = resolved_memory
self.agent = AgentParam(
agent_session=Session.from_agent_prompt(system_prompt),
provider=provider,
memory=resolved_memory,
)
_executor_override = None # tests-only seam; production paths leave it None
[docs]
def forward(self, session: Session) -> Session:
effective = (
self._inject_memory_into(session) if self.memory is not None else session
)
loop_kwargs: dict[str, object] = dict(
user_session=effective,
agent_session=self.agent.agent_session,
agent_provider=self.agent.provider,
tools=self.tools,
on_event=self._on_event,
)
if self._executor_override is not None:
loop_kwargs["executor"] = self._executor_override
out = run_session_loop(**loop_kwargs) # type: ignore[arg-type]
if self._commit_on_forward and self.memory is not None:
self._commit_session(out, wait=False)
return out
def _inject_memory_into(self, session: Session) -> Session:
"""Return a forked session with policy-supplied chunks prepended."""
assert self.memory is not None # caller-checked
try:
extras = self._memory_inject.inject(session, self.memory)
except Exception: # noqa: BLE001 -- recall must not break the loop
extras = ()
if not extras:
return session
from rath.session.chunk import ChunkTable as _CT
merged = _CT(rows=tuple(extras) + session.chunk_table.rows)
forked = session.fork()
forked.chunk_table = merged
return forked
def _commit_session(self, session: Session, *, wait: bool) -> None:
"""Best-effort commit of ``session``'s chat history into memory."""
from rath.memory.op_types import MemoryOpCommit
from rath.session.chunk import ChunkKind as _CK
messages: list[dict[str, object]] = []
for row in session.chunk_table.rows:
if row.kind == _CK.SYSTEM:
messages.append(
{"role": "system", "content": row.payload.get("content", "")}
)
elif row.kind == _CK.USER:
messages.append(
{"role": "user", "content": row.payload.get("content", "")}
)
elif row.kind == _CK.ASSISTANT:
messages.append(
{
"role": "assistant",
"content": row.payload.get("content"),
}
)
if self.memory is None:
return
try:
self.memory.dispatch(
MemoryOpCommit(
session_id=str(session.id),
messages=tuple(messages),
wait=wait,
)
)
except Exception: # noqa: BLE001 -- commit must not break forward
pass
# ---------------------------------------------------------------- public memory API
[docs]
def remember_memory(
self,
content: str,
*,
scope: str = "user",
category: str = "preferences",
wait: bool = False,
) -> "object":
"""Persist a free-form note under ``memory://{scope}/memories/{category}/...``.
``scope`` is intentionally permissive (``user`` / ``agent`` /
``session``) so user code can decide which namespace to target;
the URI prefix is adapter-coupled and other backends may rewrite
it. See :class:`~rath.memory.adapters.openviking.OpenVikingBackend`.
"""
from rath.memory.op_types import MemoryOpWrite
store = self._require_memory()
uri = self._memory_uri(scope=scope, category=category)
return store.dispatch(MemoryOpWrite(uri=uri, content=content, wait=wait))
[docs]
def recall_memory(
self,
query: str,
*,
top_k: int = 4,
target_uri: str | None = None,
) -> "object":
"""Issue a ``MemoryOpFind`` against the bound store and return the result."""
from rath.memory.op_types import MemoryOpFind
store = self._require_memory()
return store.dispatch(
MemoryOpFind(query=query, top_k=top_k, target_uri=target_uri)
)
[docs]
def commit_memory(self, session: Session, *, wait: bool = False) -> "object":
"""Commit ``session``'s chat transcript into memory for extraction."""
from rath.memory.op_types import MemoryOpCommit
from rath.session.chunk import ChunkKind as _CK
store = self._require_memory()
messages: list[dict[str, object]] = []
for row in session.chunk_table.rows:
if row.kind == _CK.SYSTEM:
messages.append(
{"role": "system", "content": row.payload.get("content", "")}
)
elif row.kind == _CK.USER:
messages.append(
{"role": "user", "content": row.payload.get("content", "")}
)
elif row.kind == _CK.ASSISTANT:
messages.append(
{"role": "assistant", "content": row.payload.get("content")}
)
return store.dispatch(
MemoryOpCommit(
session_id=str(session.id),
messages=tuple(messages),
wait=wait,
)
)
def _require_memory(self) -> MemoryStore:
if self.memory is None:
raise RuntimeError("agent has no memory store")
return self.memory
@staticmethod
def _memory_uri(*, scope: str, category: str) -> str:
import uuid as _uuid
slug = _uuid.uuid4().hex[:8]
return f"memory://{scope}/memories/{category}/{slug}"
# ---------------------------------------------------------------- lifecycle
[docs]
def close(self) -> None:
"""Release the memory store reference acquired in ``__init__`` (idempotent)."""
mem = self.memory
if mem is None:
return
self.memory = None
# Also drop the reference from the AgentParam mirror so repr stops
# showing a stale handle once the store has been released.
try:
self.agent.memory = None
except Exception: # noqa: BLE001 -- agent_param may already be gone
pass
mem.release()
def __enter__(self) -> "Agent":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: object | None,
) -> None:
self.close()
def register_tool(self, tool: FlowToolCall) -> None:
if any(t.name == tool.name for t in self.tools):
return
self.tools.append(tool)
def unregister_tool(self, tool_name: str) -> None:
self.tools = [t for t in self.tools if t.name != tool_name]