Source code for rath.flow.compressor

"""Workflow wrapper that runs :func:`~rath.session.compress.run_session_compress`."""

from __future__ import annotations

from collections.abc import Callable

from rath.flow.agent_param import AgentParam
from rath.flow.workflow import Workflow
from rath.llm import RathLLMStreamDelta
from rath.llm.provider import Provider
from rath.session import Session, run_session_compress


[docs] class Compressor(Workflow): def __init__( self, compress_instruction: str, provider: Provider, *, on_event: Callable[[RathLLMStreamDelta], None] | None = None, ): super().__init__() self._on_event = on_event self.agent = AgentParam( agent_session=Session.from_agent_prompt(compress_instruction), provider=provider, )
[docs] def forward(self, session: Session) -> Session: return run_session_compress( user_session=session, agent_session=self.agent.agent_session, agent_provider=self.agent.provider, on_event=self._on_event, )
__all__ = ["Compressor"]