Source code for rath.memory.abc

"""Memory-backend abstract base, store handle, and store spec.

Parallel to :mod:`rath.backend.abc`: :class:`MemoryStore` mirrors
:class:`~rath.backend.abc.BackendSandbox` (refcount handle),
:class:`MemoryStoreSpec` mirrors
:class:`~rath.backend.abc.BackendSandboxSpec`, and :class:`MemoryBackend`
mirrors :class:`~rath.backend.abc.Backend`.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Mapping
from dataclasses import dataclass, field, replace
from types import MappingProxyType, TracebackType
from typing import TYPE_CHECKING, Any, ClassVar

if TYPE_CHECKING:
    from rath.config.store import ConfigStore

from rath.memory.capabilities import MemoryCapabilities
from rath.memory.errors import MemoryStoreClosed
from rath.memory.op_types import MemoryOp
from rath.memory.results import MemoryResult

__all__ = ["MemoryStoreSpec", "MemoryStore", "MemoryBackend"]


[docs] @dataclass class MemoryStoreSpec: """User-facing description of a memory store to open. All fields are optional; backends may ignore fields that do not apply (e.g. an embedded backend ignores ``account_id``/``user_id``). """ namespace: str | None = None account_id: str | None = None user_id: str | None = None agent_id: str | None = None options: Mapping[str, Any] | None = None
[docs] @classmethod def from_config( cls, name: str | None = None, *, store: "ConfigStore | None" = None, **overrides: Any, ) -> "MemoryStoreSpec": """Build a :class:`MemoryStoreSpec` from ``~/.openrath/config.json``. Looks up ``name`` (or ``memory.default_provider`` when ``name=None``) under ``memory.providers``. Only **local** presets are modeled in config today; OpenViking stores should be built explicitly via :class:`MemoryStoreSpec` kwargs / ``options``. Lazy-imports :mod:`rath.config` so ``import rath.memory`` never touches the filesystem. Raises :class:`KeyError` when the named provider is missing. """ from rath.config.store import ConfigStore # local import — see docstring s = store or ConfigStore.load() entry = s.get_memory_provider(name) options: dict[str, Any] = {} if entry.path is not None: options["path"] = entry.path if entry.embedding_provider is not None: options["embedding_provider"] = entry.embedding_provider if entry.chat_provider is not None: options["chat_provider"] = entry.chat_provider base = cls( options=MappingProxyType(options) if options else None, ) if not overrides: return base return replace(base, **overrides)
[docs] @dataclass class MemoryStore: """Memory store handle with reference counting. Lifecycle mirrors :class:`~rath.backend.abc.BackendSandbox`: each :class:`~rath.flow.Agent` slot, each ``with store:`` block, and any explicit :meth:`acquire` counts as one reference. :meth:`release` decrements and, when the count reaches zero, calls ``backend.close(self)``. There is no "force close" path — callers that want immediate teardown must drop all references. :func:`MemoryBackend.open` returns a store with ``_refcount == 0``. The caller is expected to either bind it to an :class:`~rath.flow.Agent` (which acquires) or enter ``with store:`` (which acquires) before it can be safely held. """ backend: "MemoryBackend" handle: str spec: MemoryStoreSpec | None = None closed: bool = field(default=False) _refcount: int = field(default=0, repr=False) @property def refcount(self) -> int: """Current number of live references; read-only mirror of internal state.""" return self._refcount
[docs] def acquire(self) -> "MemoryStore": """Add one reference; return ``self`` for chaining.""" if self.closed: raise MemoryStoreClosed(self.handle) self._refcount += 1 return self
[docs] def release(self) -> None: """Drop one reference; close via the backend when the count hits zero.""" if self.closed: return self._refcount -= 1 if self._refcount <= 0: self.backend.close(self)
def __enter__(self) -> "MemoryStore": return self.acquire() def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None, ) -> None: self.release()
[docs] def dispatch(self, op: MemoryOp) -> MemoryResult: """Apply ``op`` through :meth:`MemoryBackend.dispatch`.""" if self.closed: raise MemoryStoreClosed(self.handle) return self.backend.dispatch(self, op)
[docs] class MemoryBackend(ABC): """Abstract base class for memory backends. Subclasses must: 1. Set the ``name`` class attribute and register via :func:`rath.memory.register`. 2. Implement the classmethods ``is_available``, ``capabilities`` and ``supported_ops``. 3. Implement the instance methods ``store_count``, ``open``, ``close`` and ``dispatch``. """ name: ClassVar[str]
[docs] @classmethod @abstractmethod def is_available(cls) -> bool: """Return whether this backend is usable in the current environment. Must be cheap (microseconds, no network, no subprocess). Examples: check that a required SDK is importable, or that a config file or environment variable is present. """
[docs] @classmethod @abstractmethod def capabilities(cls) -> MemoryCapabilities: """Return the static capability description of this backend type."""
[docs] @classmethod @abstractmethod def supported_ops(cls) -> frozenset[type[MemoryOp]]: """Return :class:`MemoryOp` subclasses this backend handles."""
[docs] @abstractmethod def store_count(self) -> int: """Return the number of open memory stores managed by this instance."""
[docs] @abstractmethod def open(self, spec: MemoryStoreSpec | None = None) -> MemoryStore: """Open a fresh memory store and return its handle."""
[docs] @abstractmethod def close(self, store: MemoryStore) -> None: """Close ``store`` and release resources. Calling close on an already-closed store is a no-op. """
[docs] @abstractmethod def dispatch(self, store: MemoryStore, op: MemoryOp) -> MemoryResult: """Execute ``op`` against ``store`` and return its result. Implementations should raise :class:`~rath.memory.errors.UnsupportedMemoryOp` for op types not in :meth:`supported_ops`. """