Source code for rath.backend.abc
"""Backend abstract base, sandbox handle, and sandbox spec.
Backends expose a *synchronous* public API (``open`` / ``close`` /
``dispatch``) and an internal *asynchronous* protocol
(``_aopen`` / ``_aclose`` / ``_adispatch``). The sync methods funnel into
the internal coroutines via :class:`rath._async.runtime.OpenRathRuntime`,
which is the single background event loop OpenRath shares across all
subsystems.
Subclasses MUST implement the async hooks. The sync defaults provided here
schedule each call on the runtime; this gives any backend true cross-call
concurrency for free as long as its async implementation is non-blocking.
"""
from __future__ import annotations
import threading
from abc import ABC, abstractmethod
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
from datetime import timedelta
from types import TracebackType
from typing import TYPE_CHECKING, ClassVar
if TYPE_CHECKING:
from rath.backend.stream import Stream
from rath.backend.capabilities import Capabilities
from rath.backend.errors import BackendSandboxClosed
from rath.backend.results import ToolResult
from rath.backend.tool_types import BackendTool
[docs]
@dataclass
class BackendSandboxSpec:
"""User-facing description of a sandbox to open.
Fields are intentionally optional. Each backend may ignore fields that do
not apply (e.g. ``LocalBackend`` ignores ``image``).
"""
image: str | None = None
entrypoint: Sequence[str] | None = None
env: Mapping[str, str] | None = None
timeout: timedelta | None = None
working_dir: str | None = None
[docs]
@dataclass
class BackendSandbox:
"""Sandbox handle with reference counting.
Lifecycle is governed by :attr:`_refcount`: each :class:`Session.sandbox`
slot, each ``with sandbox:`` block, and any explicit :meth:`acquire` counts
as one reference. :meth:`release` decrements and, when the count reaches
zero, calls ``backend.close(self)``. There is no "force close" path —
callers that want immediate teardown must drop all references.
:func:`Backend.open` returns a sandbox with ``_refcount == 0``. The caller
is expected to either bind it to a :class:`Session` (which acquires) or
enter ``with sandbox:`` (which acquires) before it can be safely held.
"""
backend: "Backend"
handle: str
spec: BackendSandboxSpec | None = None
closed: bool = field(default=False)
_refcount: int = field(default=0, repr=False)
# ``_refcount`` is read/written from both the host thread (sync facade)
# and the runtime loop thread (async session loop). Updates are guarded
# by ``_refcount_lock`` because they are not atomic across threads.
_refcount_lock: threading.Lock = field(
default_factory=threading.Lock, repr=False, compare=False
)
@property
def refcount(self) -> int:
"""Current number of live references; read-only mirror of internal state."""
with self._refcount_lock:
return self._refcount
[docs]
def acquire(self) -> "BackendSandbox":
"""Add one reference; return ``self`` for chaining."""
with self._refcount_lock:
if self.closed:
raise BackendSandboxClosed(self.handle)
self._refcount += 1
return self
[docs]
def release(self) -> None:
"""Drop one reference; close via the backend when the count hits zero."""
with self._refcount_lock:
if self.closed:
return
self._refcount -= 1
should_close = self._refcount <= 0
if should_close:
self.backend.close(self)
def __enter__(self) -> "BackendSandbox":
return self.acquire()
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
self.release()
[docs]
def dispatch(self, call: BackendTool) -> ToolResult | bool:
"""Apply ``call`` through :meth:`~rath.backend.abc.Backend.dispatch`."""
if self.closed:
raise BackendSandboxClosed(self.handle)
return self.backend.dispatch(self, call)
[docs]
def stream(self, *, buffer: int = 0) -> "Stream":
"""Return a fresh :class:`Stream` bound to this sandbox.
``buffer=0`` (the default) means an unbounded queue; set a positive integer
to apply backpressure on :meth:`Stream.submit`.
"""
from rath.backend.stream import Stream as _Stream
return _Stream(self, buffer=buffer)
[docs]
class Backend(ABC):
"""Abstract base class for sandbox backends.
Subclasses must:
1. Set the ``name`` class attribute and register via
:func:`rath.backend.register`.
2. Implement the static ``is_available``, ``capabilities`` and
``supported_calls`` classmethods.
3. Implement the instance method ``sandbox_count``.
4. Implement the async hooks ``_aopen``, ``_aclose`` and ``_adispatch``.
The sync ``open`` / ``close`` / ``dispatch`` defaults below route
these through :class:`rath._async.runtime.OpenRathRuntime` so a
single background loop services every subsystem concurrently.
"""
name: ClassVar[str]
[docs]
@classmethod
@abstractmethod
def is_available(cls) -> bool:
"""Return whether this backend is usable in the current environment.
Must be cheap (microseconds, no network, no subprocess). Examples:
check that a required SDK is importable, or that a config file or
environment variable is present.
"""
[docs]
@classmethod
@abstractmethod
def capabilities(cls) -> Capabilities:
"""Return the static capability description of this backend type."""
[docs]
@classmethod
@abstractmethod
def supported_calls(cls) -> frozenset[type[BackendTool]]:
"""Return :class:`~rath.backend.tool_types.BackendTool` subclasses this backend handles."""
[docs]
@abstractmethod
def sandbox_count(self) -> int:
"""Return the number of open sandboxes managed by this instance."""
# ----- internal async hooks (subclasses MUST implement) ----------------
@abstractmethod
async def _aopen(self, spec: BackendSandboxSpec | None = None) -> BackendSandbox:
"""Open a fresh sandbox; the async implementation."""
@abstractmethod
async def _aclose(self, sandbox: BackendSandbox) -> None:
"""Close ``sandbox`` and release resources; idempotent."""
@abstractmethod
async def _adispatch(
self, sandbox: BackendSandbox, call: BackendTool
) -> ToolResult | bool:
"""Execute ``call`` against ``sandbox``; the async implementation."""
# ----- public sync facade (default routes through the runtime) ---------
[docs]
def open(self, spec: BackendSandboxSpec | None = None) -> BackendSandbox:
"""Open a fresh sandbox and return its handle (sync facade)."""
from rath._async.runtime import runtime as _runtime
return _runtime().run(self._aopen(spec))
[docs]
def close(self, sandbox: BackendSandbox) -> None:
"""Close ``sandbox`` and release resources (sync facade).
Calling close on an already-closed sandbox is a no-op.
"""
from rath._async.runtime import runtime as _runtime
_runtime().run(self._aclose(sandbox))
[docs]
def dispatch(self, sandbox: BackendSandbox, call: BackendTool) -> ToolResult | bool:
"""Execute ``call`` against ``sandbox`` and return its result (sync facade)."""
from rath._async.runtime import runtime as _runtime
return _runtime().run(self._adispatch(sandbox, call))