"""Process-independent registry of persisted sandboxes.
:class:`PersistentSandboxRegistry` maps UUIDs to either:
* a stable on-disk working directory under
``.openrath/sandboxes/local/<uuid>/`` for :class:`LocalBackend` reuse, or
* a JSON record under ``.openrath/sandboxes/opensandbox/<uuid>.json`` that
pins an OpenSandbox remote sandbox id for a future reattach (Phase B).
The registry holds no live :class:`BackendSandbox` handles — those remain
process-local. It only tracks the **identity** so subsequent processes can
reopen a sandbox with the same spec / remote id.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from uuid import UUID, uuid4
from rath.backend.abc import BackendSandbox, BackendSandboxSpec
from rath.backend.persistence.paths import (
ensure_local_root,
ensure_opensandbox_root,
local_root,
local_sandbox_dir,
opensandbox_index_path,
opensandbox_root,
)
from rath.backend.persistence.spec_json import (
SCHEMA_VERSION,
spec_from_jsonable,
spec_to_jsonable,
)
from rath.backend.registry import get as backend_get
from rath.config.secrets import chmod_user_only
__all__ = [
"PersistentSandboxRegistry",
"RemoteSandboxRecord",
]
logger = logging.getLogger(__name__)
[docs]
@dataclass(frozen=True, kw_only=True, slots=True)
class RemoteSandboxRecord:
"""One ``.openrath/sandboxes/opensandbox/<uuid>.json`` decoded."""
schema_version: int
id: UUID
backend: str
remote_id: str
spec: BackendSandboxSpec | None
created_at: datetime
last_used_at: datetime
path: Path = field(repr=False)
[docs]
class PersistentSandboxRegistry:
"""Filesystem-backed registry of persisted sandbox identities.
Instances are cheap; everything lives on disk under
``.openrath/sandboxes/{local,opensandbox}/``. The class does **no**
locking; callers that share a registry across threads should serialize
writes externally.
"""
# ---------------------------------------------------------------- local
[docs]
def alloc_local_id(self) -> UUID:
"""Generate a new UUID and create its working directory.
Returns the UUID. Use :meth:`local_path` to resolve it back to a
:class:`pathlib.Path`. Repeated calls always return a fresh UUID;
for "give me one if missing, else reuse", call :meth:`ensure_local`
with an explicit id.
"""
sid = uuid4()
target = local_sandbox_dir(sid)
target.mkdir(parents=True, exist_ok=True)
return sid
[docs]
def ensure_local(self, sandbox_id: UUID | str) -> Path:
"""Create the working directory for ``sandbox_id`` if missing; return it.
Idempotent. Useful when a caller already knows the id (e.g. from a
persisted session header) and wants to rebind the same workdir.
"""
target = local_sandbox_dir(sandbox_id)
target.mkdir(parents=True, exist_ok=True)
return target
[docs]
def local_path(self, sandbox_id: UUID | str) -> Path:
"""Resolve a local sandbox id to its on-disk working directory.
Does not check existence — use :meth:`ensure_local` to create on
demand. Returns the path even when the directory has been removed,
so callers can decide whether to recreate.
"""
return local_sandbox_dir(sandbox_id)
[docs]
def list_local(self) -> list[UUID]:
"""Enumerate UUID-named subdirectories under ``sandboxes/local/``."""
root = local_root()
if not root.is_dir():
return []
ids: list[UUID] = []
for entry in sorted(root.iterdir()):
if not entry.is_dir():
continue
try:
ids.append(UUID(entry.name))
except ValueError:
logger.debug("ignoring non-UUID dir in local sandboxes: %s", entry)
return ids
[docs]
def delete_local(self, sandbox_id: UUID | str) -> bool:
"""Remove the on-disk working directory for a local sandbox.
Returns ``True`` when the directory existed and was removed,
``False`` when it was already absent. Idempotent.
"""
import shutil
path = local_sandbox_dir(sandbox_id)
if not path.exists():
return False
shutil.rmtree(path, ignore_errors=False)
return True
[docs]
def prune_local(self, *, older_than: timedelta) -> list[UUID]:
"""Remove local sandbox dirs whose mtime is older than ``older_than``.
Returns the list of removed ids in deletion order. Useful for a
weekly cron / startup sweep — ``PersistentSandboxRegistry().prune_local(older_than=timedelta(days=30))``.
"""
cutoff = datetime.now(timezone.utc) - older_than
removed: list[UUID] = []
for sid in self.list_local():
path = local_sandbox_dir(sid)
try:
mtime = datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc)
except OSError:
continue
if mtime < cutoff and self.delete_local(sid):
removed.append(sid)
return removed
# ---------------------------------------------------------------- remote
[docs]
def record_remote(
self,
backend: str,
remote_id: str,
spec: BackendSandboxSpec | None = None,
*,
sandbox_id: UUID | None = None,
) -> UUID:
"""Persist ``remote_id`` (e.g. an OpenSandbox ``native.id``) under a new UUID.
``backend`` is the backend name (typically ``"opensandbox"``).
``spec`` is the :class:`BackendSandboxSpec` used to create the
remote sandbox; it round-trips through the same JSON projection as
the session header. Returns the registry UUID (NOT ``remote_id``)
so callers can keep using a stable local handle.
"""
sid = sandbox_id or uuid4()
now = datetime.now(timezone.utc)
ensure_opensandbox_root()
path = opensandbox_index_path(sid)
record = {
"schema_version": SCHEMA_VERSION,
"id": str(sid),
"backend": backend,
"remote_id": remote_id,
"spec": spec_to_jsonable(spec),
"created_at": now.isoformat(),
"last_used_at": now.isoformat(),
}
path.write_text(
json.dumps(record, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
chmod_user_only(path)
return sid
[docs]
def touch_remote(self, sandbox_id: UUID | str) -> None:
"""Update ``last_used_at`` on the remote sandbox index file.
Silently no-ops when the record is missing — callers should rely on
:meth:`load_remote` first if presence matters.
"""
path = opensandbox_index_path(sandbox_id)
if not path.is_file():
return
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
logger.warning("touch_remote: %s is unreadable", path, exc_info=True)
return
data["last_used_at"] = datetime.now(timezone.utc).isoformat()
path.write_text(
json.dumps(data, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
[docs]
def load_remote(self, sandbox_id: UUID | str) -> RemoteSandboxRecord | None:
"""Read one remote-sandbox index file. Returns ``None`` when missing."""
path = opensandbox_index_path(sandbox_id)
if not path.is_file():
return None
return _decode_remote(path)
[docs]
def list_remote(self) -> list[RemoteSandboxRecord]:
"""Enumerate every remote sandbox record. Unreadable files are skipped."""
root = opensandbox_root()
if not root.is_dir():
return []
records: list[RemoteSandboxRecord] = []
for entry in sorted(root.iterdir()):
if not entry.is_file() or entry.suffix != ".json":
continue
decoded = _decode_remote(entry)
if decoded is not None:
records.append(decoded)
records.sort(key=lambda r: r.created_at)
return records
[docs]
def delete_remote(self, sandbox_id: UUID | str) -> bool:
"""Remove the index file for a recorded remote sandbox.
Returns ``True`` when the file existed and was removed; ``False``
when absent. Does **not** kill the remote container on the server —
that's the caller's responsibility via the backend's ``close()``.
"""
path = opensandbox_index_path(sandbox_id)
if not path.exists():
return False
path.unlink()
return True
[docs]
def prune_remote(self, *, older_than: timedelta) -> list[UUID]:
"""Remove index files whose ``last_used_at`` is older than ``older_than``.
Returns the deleted ids in deletion order. The remote containers
themselves are not touched.
"""
cutoff = datetime.now(timezone.utc) - older_than
removed: list[UUID] = []
for record in self.list_remote():
last_used = record.last_used_at
if last_used.tzinfo is None:
last_used = last_used.replace(tzinfo=timezone.utc)
if last_used < cutoff and self.delete_remote(record.id):
removed.append(record.id)
return removed
[docs]
def reattach_remote(self, sandbox_id: UUID | str) -> BackendSandbox:
"""Reattach to a previously recorded remote sandbox by its registry id.
Looks up the index file, then delegates to the named backend's
``attach`` method (currently only :class:`OpenSandboxBackend` provides
one). Updates ``last_used_at`` on success.
Raises :class:`KeyError` when the registry id is unknown, and
``AttributeError`` when the recorded backend has no ``attach`` —
either path produces a clear error rather than silently creating a
new container.
"""
record = self.load_remote(sandbox_id)
if record is None:
raise KeyError(
f"no remote sandbox recorded under id={sandbox_id!r}; "
f"available: {[str(r.id) for r in self.list_remote()]}",
)
backend = backend_get(record.backend)
attach = getattr(backend, "attach", None)
if attach is None:
raise AttributeError(
f"backend {record.backend!r} does not support reattach "
f"(no .attach(remote_id) method); cannot reuse remote_id "
f"{record.remote_id!r}",
)
sandbox: BackendSandbox = attach(record.remote_id, spec=record.spec)
self.touch_remote(record.id)
return sandbox
# ---------------------------------------------------------------- bootstrap
[docs]
def ensure_dirs(self) -> None:
"""Create both ``sandboxes/local/`` and ``sandboxes/opensandbox/`` roots."""
ensure_local_root()
ensure_opensandbox_root()
def _decode_remote(path: Path) -> RemoteSandboxRecord | None:
try:
raw = json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
logger.warning("unreadable remote-sandbox record: %s", path, exc_info=True)
return None
if not isinstance(raw, dict):
return None
try:
return RemoteSandboxRecord(
schema_version=int(raw.get("schema_version", 0)),
id=UUID(str(raw["id"])),
backend=str(raw.get("backend", "")),
remote_id=str(raw["remote_id"]),
spec=spec_from_jsonable(raw.get("spec")),
created_at=datetime.fromisoformat(str(raw["created_at"])),
last_used_at=datetime.fromisoformat(str(raw["last_used_at"])),
path=path,
)
except (KeyError, ValueError, TypeError):
logger.warning("malformed remote-sandbox record: %s", path, exc_info=True)
return None