πŸ”’

Sanjaya Refactor Plan

Enter PIN to access

Incorrect PIN. Try again.
Sanjaya β€” Generic Extensible Agent Framework Plan Β· 2026-04-05

Sanjaya Refactor: Generic Extensible Agent Framework

Overview

Refactor sanjaya from a hardcoded video QA pipeline into a generic, extensible RLM agent framework. The core idea stays the same β€” the LLM writes Python code in a sandboxed REPL to solve problems β€” but the framework becomes domain-agnostic with video as a built-in toolkit.

Three pillars: pydantic-ai (LLM), pydantic-monty (REPL sandbox), logfire (tracing).

Package: sanjaya on PyPI, installable via uv add sanjaya.

Desired End State

# Text RLM β€” Jupyter notebook
from sanjaya import Agent

agent = Agent()
answer = agent.ask("What is the main argument?", context=open("paper.txt").read())
print(answer.text)

# Video RLM β€” Jupyter notebook
from sanjaya import Agent
from sanjaya.tools.video import VideoToolkit

agent = Agent(model="openrouter:openai/gpt-5.3-codex")
agent.use(VideoToolkit(vision_model="openrouter:openai/gpt-4.1"))

answer = agent.ask("What happens after the explosion?", video="movie.mp4")
for e in answer.evidence:
    print(f"  [{e.source}] {e.rationale}")

# Custom tools
from sanjaya import Agent, tool

@tool
def search_kb(query: str, limit: int = 10) -> list[dict]:
    """Search the knowledge base for relevant documents."""
    return my_db.search(query, limit)

agent = Agent()
agent.use(search_kb)
answer = agent.ask("What's our refund policy?", context=email_text)

Verification

What We’re NOT Doing

Current State Analysis

Key Discoveries


Target Package Structure

src/sanjaya/
    __init__.py              β€” exports: Agent, tool, Toolkit, Answer, Evidence
    agent.py                 β€” Agent class (single entry point)
    answer.py                β€” Answer, Evidence models
    settings.py              β€” Settings (env/dotenv)

    core/
        __init__.py
        loop.py              β€” iteration loop: extract blocks β†’ execute β†’ check done
        repl.py              β€” MontyREPL wrapper with dynamic tool injection
        compaction.py        β€” context compaction at threshold
        budget.py            β€” cost/token budget tracking with limits
        prompts.py           β€” system prompt builder (auto-generates tool docs)
        blocks.py            β€” code block extraction + execution feedback formatting
        errors.py            β€” Monty error hints + smart feedback formatting
        patterns.py          β€” PatternMemory: learn working code across runs

    llm/
        __init__.py
        client.py            β€” LLMClient: text, vision, batched (unified)
        types.py             β€” UsageSnapshot, CallMetadata

    retrieval/
        __init__.py          β€” exports: RetrievalBackend, SQLiteFTSBackend
        base.py              β€” RetrievalBackend ABC
        sqlite_fts.py        β€” SQLiteFTSBackend (default, zero-setup)
        token_overlap.py     β€” TokenOverlapBackend (legacy, in-memory only)

    tools/
        __init__.py          β€” exports: tool, Tool, Toolkit
        base.py              β€” Tool dataclass, Toolkit base, @tool decorator
        registry.py          β€” ToolRegistry: register, lookup, prompt generation
        builtins.py          β€” llm_query, llm_query_batched, done, get_state

        video/
            __init__.py      β€” exports: VideoToolkit
            toolkit.py       β€” VideoToolkit(Toolkit): bundles all video tools
            retrieval.py     β€” list_windows (uses RetrievalBackend for scoring)
            media.py         β€” get_video_info, extract_clip, sample_frames
            vision.py        β€” vision_query, vision_query_batched
            transcription.py β€” transcribe (subtitle generation)
            workspace.py     β€” ArtifactWorkspace
            mount.py         β€” WorkspaceMount / OSAccess bridge

        report/
            __init__.py      β€” exports: ReportToolkit
            toolkit.py       β€” ReportToolkit(Toolkit): save + index
            writers.py       β€” save_note, save_qmd, save_data implementations

    tracing/
        __init__.py
        tracer.py            β€” single unified Tracer (generic span methods)
        events.py            β€” in-memory event buffer for SSE polling
        observability.py     β€” logfire configure + instrument_pydantic_ai

API Specifications

1. sanjaya.Agent

The single entry point. Replaces both RLM_REPL and VideoRLM_REPL.

class Agent:
    """RLM agent that solves problems by writing code in a sandboxed REPL."""

    def __init__(
        self,
        model: str = "openrouter:openai/gpt-5.3-codex",
        sub_model: str = "openrouter:openai/gpt-4.1-mini",
        vision_model: str | None = None,
        fallback_model: str | None = "openrouter:vikhyatk/moondream2",
        max_iterations: int = 20,
        max_budget_usd: float | None = None,
        max_timeout_s: float | None = None,
        compaction_threshold: float = 0.85,
        tracing: bool = True,
    ):
        """
        Args:
            model: Primary orchestrator model (writes REPL code).
            sub_model: Cheaper model used by llm_query() inside the REPL.
            vision_model: Model for vision_query(). Falls back to sub_model.
            fallback_model: Last-resort model if sub_model fails.
            max_iterations: Hard cap on orchestrator loop iterations.
            max_budget_usd: Stop when cumulative LLM cost exceeds this.
            max_timeout_s: Stop when wall-clock time exceeds this.
            compaction_threshold: Fraction of context window at which to
                compact message history (0.0 to disable).
            tracing: Enable logfire tracing.
        """

    def use(self, *tools_or_toolkits: "Tool | Toolkit") -> "Agent":
        """Register tools or toolkits. Chainable.

        >>> agent = Agent().use(VideoToolkit()).use(my_custom_tool)
        """

    def ask(
        self,
        question: str,
        *,
        context: Any = None,
        video: str | None = None,
        subtitle: str | None = None,
    ) -> "Answer":
        """Run the RLM loop and return a structured answer.

        Args:
            question: The question to answer.
            context: Arbitrary context data (str, list, dict). Accessible
                as `context` variable in the REPL.
            video: Path to a video file. If provided, auto-registers
                VideoToolkit (if not already registered) and makes video
                metadata available as context.
            subtitle: Path to subtitle/transcript file. Used with video.

        Returns:
            Structured Answer with text, evidence, cost, and usage.
        """

    async def ask_async(
        self,
        question: str,
        **kwargs,
    ) -> "Answer":
        """Async version of ask()."""

    @property
    def last_answer(self) -> "Answer | None":
        """Most recent answer, for notebook inspection."""

    @property
    def cost_so_far(self) -> float:
        """Cumulative USD spent across all ask() calls."""

    def reset(self) -> None:
        """Clear all state (budget, history, workspace)."""

Usage in Jupyter:

from sanjaya import Agent
from sanjaya.tools.video import VideoToolkit

agent = Agent(
    model="openrouter:openai/gpt-5.3-codex",
    sub_model="openrouter:openai/gpt-4.1-mini",
    vision_model="openrouter:openai/gpt-4.1",
    max_iterations=15,
    max_budget_usd=2.0,
)
agent.use(VideoToolkit(workspace_dir="./my_artifacts"))

answer = agent.ask(
    "Does the defendant touch the evidence bag?",
    video="/data/deposition_clip.mp4",
)

print(answer.text)
print(f"Cost: ${answer.cost_usd:.4f}")
print(f"Iterations: {answer.iterations}")
for ev in answer.evidence:
    print(f"  [{ev.source}] {ev.rationale}")

2. sanjaya.Answer and sanjaya.Evidence

Generic answer model. Tools can attach domain-specific evidence.

class Evidence(BaseModel):
    """A piece of evidence supporting the answer."""
    source: str                      # e.g. "video:12.5s-45.0s", "context:chunk-3"
    rationale: str                   # why this evidence matters
    artifacts: dict[str, Any] = {}   # tool-specific: clip_path, frame_paths, etc.

class Answer(BaseModel):
    """Structured output from Agent.ask()."""
    question: str
    text: str                        # the final answer string
    evidence: list[Evidence] = []
    iterations: int                  # how many orchestrator loops ran
    cost_usd: float | None = None
    input_tokens: int | None = None
    output_tokens: int | None = None
    wall_time_s: float | None = None
    trace_id: str | None = None      # logfire trace ID for linking

3. sanjaya.tools β€” Tool System

@tool decorator

from sanjaya import tool

@tool
def search_database(query: str, limit: int = 10) -> list[dict]:
    """Search the vector database for relevant documents.

    Use this when you need to find information related to a query.
    Returns a list of document dicts with 'text' and 'score' keys.

    Args:
        query: The search query string.
        limit: Maximum number of results to return.
    """
    return db.search(query, limit)

The decorator inspects the function signature + docstring and produces a Tool object. The docstring becomes the tool description in the REPL prompt (so the LLM knows when and how to use it).

Tool dataclass

@dataclass
class Tool:
    """A callable tool available in the agent's REPL environment."""
    name: str                          # function name in REPL
    description: str                   # shown to LLM in system prompt
    fn: Callable[..., Any]             # the actual implementation
    parameters: dict[str, ToolParam]   # parsed from signature
    return_type: str                   # e.g. "list[dict]", "str"

@dataclass
class ToolParam:
    name: str
    type_hint: str       # e.g. "str", "int", "list[str]"
    default: Any         # inspect.Parameter.empty if required
    description: str     # parsed from docstring Args section

Toolkit base class

class Toolkit(ABC):
    """Bundle of related tools with shared state."""

    @abstractmethod
    def tools(self) -> list[Tool]:
        """Return all tools this toolkit provides."""

    def setup(self, context: dict[str, Any]) -> None:
        """Called once before the RLM loop starts.
        Receives the full context dict (question, video path, etc.).
        Use this to initialize shared state (workspace, transcript, etc.).
        """

    def teardown(self) -> None:
        """Called after the RLM loop ends. Cleanup resources."""

    def get_state(self) -> dict[str, Any]:
        """Return toolkit state for introspection.
        Merged into the get_state() builtin response.
        """
        return {}

    def build_evidence(self) -> list[Evidence]:
        """Convert toolkit artifacts into Evidence items for the Answer."""
        return []

    def prompt_section(self) -> str | None:
        """Optional extra prompt text injected into the system prompt.
        Use for domain-specific strategy guidance.
        """
        return None

ToolRegistry

class ToolRegistry:
    """Manages tool registration and generates prompt documentation."""

    def register(self, tool: Tool) -> None: ...
    def register_toolkit(self, toolkit: Toolkit) -> None: ...
    def get(self, name: str) -> Tool | None: ...
    def all_tools(self) -> list[Tool]: ...

    def build_external_functions(self) -> dict[str, Callable]:
        """Build the external_functions dict for MontyRepl.feed_run().
        Maps tool names to their callables.
        """

    def generate_tool_docs(self) -> str:
        """Auto-generate the tool contract section of the system prompt.

        Example output:
            Available tools in the REPL:
            - `search_transcript(query: str, top_k: int = 10) -> list[dict]`
              Search subtitle segments for relevant content.
            - `extract_clip(window_id: str = None, start_s: float = None, ...) -> dict`
              Extract a video clip from the source video.
        """

4. sanjaya.tools.video.VideoToolkit

The built-in toolkit for video analysis. Provides all tools the REPL needs to work with video.

class VideoToolkit(Toolkit):
    """Complete video analysis toolkit with retrieval, media, and vision tools."""

    def __init__(
        self,
        vision_model: str | None = None,       # override agent's vision_model
        subtitle_mode: str = "auto",            # auto | local | api | none
        workspace_dir: str = "./sanjaya_artifacts",
        max_frames_per_clip: int = 8,
        window_size_s: float = 45.0,
        stride_s: float = 30.0,
        retrieval: "RetrievalBackend | None" = None,  # defaults to SQLiteFTSBackend
    ): ...

    def tools(self) -> list[Tool]:
        """Returns all 7 video tools."""

    def setup(self, context: dict[str, Any]) -> None:
        """Resolves subtitles, loads transcript, indexes segments in
        retrieval backend, creates workspace."""

    def get_state(self) -> dict[str, Any]:
        """Returns accumulated clips, windows, visited ranges, manifest."""

    def build_evidence(self) -> list[Evidence]:
        """Converts clip manifest into Evidence items."""

    def prompt_section(self) -> str:
        """Returns video-specific strategy guidance + transcript."""

Video Tools (7 total)

Each of these becomes a Tool registered by VideoToolkit.tools().

Removed tools and why: - No search_transcript β€” the root RLM has the full transcript in context and can write its own search code (string matching, regex, llm_query over chunks). Don’t hardcode what the model can program itself. - No get_transcript β€” transcript is already injected into context via prompt_section(). The model accesses it as context in the REPL. - No get_clip_manifest β€” merged into get_state(), which now returns everything (clips, windows, visited ranges, manifest paths).

Guiding principle: If the model can do it with code in the REPL, don’t make it a tool. Tools exist for things the sandbox CAN’T do (shell out to ffmpeg, call vision LLMs, read toolkit internal state).

# ── Information ──────────────────────────────────────────

def get_video_info() -> dict:
    """Get video metadata: duration, resolution, codec, file size.

    Call this first to understand what you're working with.
    Returns: {"duration_s": float, "width": int, "height": int,
              "codec": str, "file_size_mb": float}
    """

# ── Retrieval ────────────────────────────────────────────

def list_windows(
    question: str | None = None,
    top_k: int | None = None,
    window_size_s: float = 45.0,
) -> list[dict]:
    """Generate ranked candidate temporal windows for analysis.

    Uses the retrieval backend (SQLite FTS by default) for scoring
    subtitle segments, combined with uniform sliding windows.
    Previously-visited windows are auto-excluded (progressive scanning).
    top_k auto-scales with video duration when None.

    Args:
        question: Override question for scoring. Defaults to the original query.
        top_k: Max windows. None = auto-scale with duration.
        window_size_s: Window duration in seconds.

    Returns: [{"window_id": str, "start_s": float, "end_s": float,
               "score": float, "strategy": str, "reason": str | None}]
    """

# ── Media extraction ─────────────────────────────────────

def extract_clip(
    *,
    window_id: str | None = None,
    start_s: float | None = None,
    end_s: float | None = None,
) -> dict:
    """Extract a video clip from the source video.

    Provide either a window_id (from list_windows) or explicit timestamps.
    The extracted clip is saved to the workspace.

    Returns: {"clip_id": str, "clip_path": str, "start_s": float, "end_s": float}
    """

def sample_frames(
    *,
    clip_id: str | None = None,
    clip_path: str | None = None,
    max_frames: int = 8,
) -> list[str]:
    """Sample uniformly-spaced frames from an extracted clip.

    Args:
        clip_id: ID from extract_clip().
        clip_path: Direct path (alternative to clip_id).
        max_frames: Number of frames to extract.

    Returns: List of JPEG frame file paths.
    """

# ── Vision ───────────────────────────────────────────────

def vision_query(
    *,
    prompt: str | None = None,
    clip_id: str | None = None,
    frame_paths: list[str] | None = None,
) -> str:
    """Query a vision model about video frames or a clip.

    Sends frames (preferred) or a clip to a multimodal LLM.
    If no prompt given, uses the original question.

    Args:
        prompt: What to ask about the visual content.
        clip_id: ID from extract_clip() (will use its frames).
        frame_paths: Direct frame paths (alternative to clip_id).

    Returns: Vision model's text response.
    """

def vision_query_batched(
    queries: list[dict],
) -> list[str]:
    """Run multiple vision queries concurrently.

    Much faster than sequential vision_query() calls for independent
    analyses of different clips.

    Args:
        queries: List of dicts, each with keys matching vision_query params:
            [{"prompt": "...", "clip_id": "..."}, ...]

    Returns: List of responses in the same order as input queries.
    """

# ── State ────────────────────────────────────────────────

def get_state() -> dict:
    """Inspect accumulated analysis state and workspace manifest.

    Returns: {"clips_extracted": int, "windows_visited": int,
              "visited_ranges": [...], "total_coverage_s": float,
              "clips": {...}, "candidate_windows": [...],
              "run_id": str}
    """

5. sanjaya.retrieval β€” Pluggable Retrieval Backend

The retrieval layer is shared across toolkits. Default is SQLite FTS5 β€” zero external dependencies, instant indexing, persists to disk for cross-run memory.

class RetrievalBackend(ABC):
    """Pluggable retrieval backend for semantic/keyword search."""

    @abstractmethod
    def index(
        self,
        documents: list[str],
        metadata: list[dict[str, Any]] | None = None,
        collection: str = "default",
    ) -> None:
        """Index documents. Metadata is stored alongside for filtering."""

    @abstractmethod
    def search(
        self,
        query: str,
        *,
        top_k: int = 10,
        collection: str = "default",
        filter_condition: str | None = None,
    ) -> list[dict[str, Any]]:
        """Search indexed documents.

        Returns: [{"text": str, "score": float, "metadata": dict, "doc_id": int}]
        """

    @abstractmethod
    def delete(
        self,
        *,
        collection: str = "default",
        condition: str | None = None,
    ) -> int:
        """Delete documents. Returns count deleted."""


class SQLiteFTSBackend(RetrievalBackend):
    """SQLite FTS5 full-text search. Default backend.

    - Zero external dependencies (sqlite3 is stdlib)
    - Indexing: <100ms for a video's worth of subtitle segments
    - Persists to disk β€” cross-run memory out of the box
    - BM25 ranking built into FTS5
    """

    def __init__(self, path: str = ".sanjaya/retrieval.db"):
        """
        Args:
            path: Path to SQLite database file. Created if it doesn't exist.
                  Use ":memory:" for ephemeral (single-run) usage.
        """

    def index(self, documents, metadata=None, collection="default"):
        """INSERT into FTS5 virtual table. <100ms for hundreds of segments."""

    def search(self, query, *, top_k=10, collection="default", filter_condition=None):
        """FTS5 MATCH with BM25 ranking. <10ms per query."""

    def delete(self, *, collection="default", condition=None):
        """DELETE with optional WHERE clause on metadata."""

    def collections(self) -> list[str]:
        """List all indexed collections."""

    def count(self, collection: str = "default") -> int:
        """Count documents in a collection."""

How toolkits use the backend:

# VideoToolkit.setup() indexes subtitle segments
backend.index(
    documents=[seg.text for seg in subtitle_segments],
    metadata=[{"start_s": seg.start_s, "end_s": seg.end_s, "idx": i}
              for i, seg in enumerate(subtitle_segments)],
    collection="transcripts",
)

# list_windows() queries it for scoring
hits = backend.search(question, top_k=top_k * 2, collection="transcripts")

# ReportToolkit.save_note() indexes saved content
backend.index(
    documents=[content],
    metadata=[{"filename": filename, "run_id": run_id, "timestamp": time.time()}],
    collection="reports",
)

Tiered upgrade path (documented, not implemented now):

Tier Backend Setup Indexing Search Persistence
0 TokenOverlapBackend None None In-memory scoring No
1 SQLiteFTSBackend (default) None <100ms BM25, <10ms .sanjaya/retrieval.db
2 NextPlaidBackend (future) Docker server 1-5s ColBERT semantic, 50-500ms Server-side

6. sanjaya.tools.report.ReportToolkit

Saves analysis artifacts to disk and auto-indexes them in the retrieval backend for cross-run memory.

class ReportToolkit(Toolkit):
    """Save analysis outputs and build cross-run memory."""

    def __init__(
        self,
        output_dir: str = "./sanjaya_reports",
        retrieval: RetrievalBackend | None = None,
    ):
        """
        Args:
            output_dir: Directory for saved files.
            retrieval: If provided, saved content is auto-indexed
                in the "reports" collection for cross-run search.
        """

    def tools(self) -> list[Tool]:
        """Returns 3 report tools."""

Report Tools (3 total)

def save_note(content: str, filename: str) -> str:
    """Save a markdown/text note to the reports directory.

    Content is also indexed in the retrieval backend (if configured)
    so future runs can find it via semantic_search.

    Args:
        content: The text content to save.
        filename: Filename (e.g., "analysis.md", "findings.txt").

    Returns: Absolute path to the saved file.
    """

def save_qmd(content: str, filename: str) -> str:
    """Save a Quarto markdown document to the reports directory.

    Use this for structured reports with YAML frontmatter.
    Auto-indexed for cross-run retrieval.

    Args:
        content: Full QMD content including frontmatter.
        filename: Filename (e.g., "report.qmd").

    Returns: Absolute path to the saved file.
    """

def save_data(data: Any, filename: str) -> str:
    """Save structured data (JSON, CSV) to the reports directory.

    Args:
        data: Dict/list for JSON, or list[list] for CSV.
        filename: Filename with extension (e.g., "results.json", "data.csv").

    Returns: Absolute path to the saved file.
    """

7. Monty REPL Failure Handling

Three layers to handle pydantic-monty’s limited Python coverage.

Layer 1: Prompt-level constraints (prevent failures)

Documented in the system prompt upfront:

You are in a sandboxed REPL (pydantic-monty). Available:
- Standard data types: list, dict, set, tuple, str, int, float, bool
- String operations, f-strings, list comprehensions, slicing, unpacking
- math, re, json, collections, itertools, functools
- All registered tools (see below)

NOT available: os, sys, subprocess, pathlib, importlib, open(),
file I/O, network access, eval(), exec(). Use the provided tools
for all external operations (file saving, video processing, LLM calls).

Layer 2: Smart error feedback (guide recovery)

When Monty raises a specific error, format it with a hint instead of raw traceback:

# core/errors.py

MONTY_HINTS: dict[type, str] = {
    ModuleNotFoundError: (
        "This module is not available in the sandboxed REPL. "
        "Use the provided tools for external operations: "
        "get_video_info() for metadata, extract_clip() for media, "
        "save_note() for file output, llm_query() for LLM calls."
    ),
    FileNotFoundError: (
        "Direct file access is not available. Use the provided tools: "
        "get_video_info(), extract_clip(), sample_frames(), save_note()."
    ),
    PermissionError: (
        "The REPL is sandboxed. Use save_note() or save_data() to write files."
    ),
}

def format_error_with_hints(exc: Exception, registry: ToolRegistry) -> str:
    """Format an error with Monty-specific recovery hints.

    Includes: the error message, a hint based on error type,
    and a list of available tools that might help.
    """

Layer 3: Pattern memory (learn working code across runs)

# core/patterns.py

class PatternMemory:
    """Learn what code works/fails in the REPL across runs.

    Stores patterns in a SQLite database alongside the retrieval backend.
    Successful patterns are injected into the system prompt as worked examples.
    Failed patterns are injected as warnings.
    """

    def __init__(self, path: str = ".sanjaya/patterns.db"): ...

    def record_success(
        self,
        code: str,
        tools_used: list[str],
        description: str | None = None,
    ) -> None:
        """Record a code block that executed successfully."""

    def record_failure(
        self,
        code: str,
        error_type: str,
        error_message: str,
    ) -> None:
        """Record a code block that failed."""

    def get_examples(
        self,
        tools: list[str],
        limit: int = 3,
    ) -> list[dict]:
        """Get proven code examples relevant to the registered tools.

        Returns: [{"code": str, "description": str, "tools_used": [...]}]
        Used by build_system_prompt() to inject worked examples.
        """

    def get_anti_patterns(self, limit: int = 5) -> list[dict]:
        """Get common failure patterns to warn about.

        Returns: [{"code_snippet": str, "error": str, "hint": str}]
        """

Integration with the loop:

# In core/loop.py run_loop():
for code_block in code_blocks:
    result = repl.execute(code_block)
    if result.stderr:
        # Format with hints
        feedback = format_error_with_hints(result.error, registry)
        # Record failure for future runs
        pattern_memory.record_failure(code_block, type(result.error).__name__, str(result.error))
    else:
        # Record success for future runs
        tools_used = [t for t in registry.all_tools() if t.name in code_block]
        pattern_memory.record_success(code_block, [t.name for t in tools_used])

Integration with prompt building:

# In core/prompts.py build_system_prompt():
examples = pattern_memory.get_examples(tools=[t.name for t in registry.all_tools()])
anti_patterns = pattern_memory.get_anti_patterns()

# Inject into prompt:
# "Here are proven code patterns that work in this REPL:"
# "WARNING: These patterns are known to fail in this REPL:"

8. sanjaya.core β€” The Engine

core.repl.AgentREPL

Single REPL wrapper replacing both MontyREPL and VideoMontyREPL.

class AgentREPL:
    """Sandboxed Python REPL with dynamic tool injection."""

    def __init__(
        self,
        registry: ToolRegistry,
        context: Any = None,
        os_access: OSAccess | None = None,
    ):
        self.monty = MontyRepl()
        self.registry = registry
        self.context = context
        self._os_access = os_access

    def execute(
        self,
        code: str,
        *,
        iteration: int | None = None,
        block_index: int | None = None,
        block_total: int | None = None,
    ) -> ExecutionResult:
        """Execute a code block in the sandbox.

        Injects all registered tools + builtins (context, llm_query,
        llm_query_batched, done, get_state) as external_functions.
        """

    def set_context(self, context: Any) -> None:
        """Update the context variable."""

    def set_os_access(self, os_access: OSAccess | None) -> None:
        """Update the Monty filesystem mount."""

core.loop.run_loop

The single iteration loop. Replaces the duplicated loop logic in RLM_REPL.completion() and VideoRLM_REPL.completion().

@dataclass
class LoopConfig:
    max_iterations: int = 20
    max_budget_usd: float | None = None
    max_timeout_s: float | None = None
    compaction_threshold: float = 0.85
    force_all_iterations: bool = False

@dataclass
class LoopResult:
    raw_answer: Any              # whatever done() received
    iterations_used: int
    messages: list[dict]         # full message history
    budget: BudgetTracker
    wall_time_s: float


def run_loop(
    *,
    orchestrator: LLMClient,
    repl: AgentREPL,
    system_prompt: str,
    question: str,
    config: LoopConfig,
    budget: BudgetTracker,
    tracer: Tracer,
) -> LoopResult:
    """The RLM iteration loop.

    1. Build initial messages [system, user]
    2. For each iteration:
       a. Check budget/timeout
       b. Call orchestrator LLM
       c. Extract code blocks
       d. Execute each block in REPL
       e. Format feedback, append to messages
       f. Check for done() signal
       g. If approaching context limit, compact history
    3. If max iterations reached, force final answer
    4. Return LoopResult
    """

core.compaction.compact_history

def compact_history(
    *,
    messages: list[dict[str, str]],
    llm: LLMClient,
    system_prompt: str,
    threshold_pct: float = 0.85,
    model_context_limit: int = 200_000,
) -> list[dict[str, str]]:
    """Summarize message history when approaching context limit.

    Asks the LLM to summarize progress in 1-3 paragraphs,
    preserving key intermediate results. Replaces history with
    [system_prompt, summary, "continue from summary"].

    Returns the new (shorter) message list, or the original
    if under threshold.
    """

core.budget.BudgetTracker

class BudgetTracker:
    """Tracks cumulative cost and tokens across the run."""

    def __init__(
        self,
        max_budget_usd: float | None = None,
        max_timeout_s: float | None = None,
    ): ...

    def record(
        self,
        input_tokens: int = 0,
        output_tokens: int = 0,
        cost_usd: float = 0.0,
        model: str | None = None,
    ) -> None: ...

    @property
    def total_cost_usd(self) -> float: ...

    @property
    def total_input_tokens(self) -> int: ...

    @property
    def total_output_tokens(self) -> int: ...

    @property
    def elapsed_s(self) -> float: ...

    @property
    def budget_exceeded(self) -> bool: ...

    @property
    def timeout_exceeded(self) -> bool: ...

    def should_stop(self) -> bool:
        """True if any limit is exceeded."""

core.prompts.build_system_prompt

Auto-generates the system prompt from the tool registry. Follows the alexzhang13/rlm prompt philosophy: teach by example, document every tool, include strategy guidance.

def build_system_prompt(
    *,
    registry: ToolRegistry,
    context_metadata: dict[str, Any] | None = None,
    toolkit_sections: list[str] | None = None,
) -> str:
    """Build the full system prompt.

    Structure:
    1. Core RLM instructions (you have a REPL, use it to solve problems)
    2. Built-in functions (context, llm_query, llm_query_batched, done, get_state)
    3. Auto-generated tool docs (from registry)
    4. Toolkit-specific strategy sections (e.g., video analysis workflow)
    5. Context metadata (type, size, chunk lengths)
    6. Examples and guardrails
    """

core.blocks

def extract_code_blocks(response: str) -> list[str]:
    """Extract ```python or ```repl code blocks from LLM response."""

def format_execution_feedback(result: ExecutionResult, block_index: int) -> str:
    """Format REPL output as a user message for the next iteration."""

def extract_final_answer(result: ExecutionResult, response: str) -> Any | None:
    """Check for done() signal, FINAL(...), or 'final answer:' in response."""

6. sanjaya.llm β€” LLM Layer

llm.client.LLMClient

Unified client. Replaces both utils/llm.py:LLMClient and video_llm.py:VideoLLMClient.

class LLMClient:
    """Unified LLM client with text, vision, and batched support."""

    def __init__(
        self,
        model: str,
        vision_model: str | None = None,
        fallback_model: str | None = None,
    ): ...

    def completion(self, prompt: str | list[dict], timeout: int = 300) -> str:
        """Single text completion."""

    def vision_completion(
        self,
        *,
        prompt: str,
        frame_paths: list[str] | None = None,
        clip_paths: list[str] | None = None,
        timeout: int = 300,
    ) -> str:
        """Single vision completion with image/video attachments."""

    def completion_batched(
        self,
        prompts: list[str],
        timeout: int = 300,
    ) -> list[str]:
        """Concurrent text completions via asyncio.gather.
        Returns responses in same order as input prompts.
        """

    def vision_completion_batched(
        self,
        queries: list[dict],
        timeout: int = 300,
    ) -> list[str]:
        """Concurrent vision completions.
        Each query dict has keys: prompt, frame_paths, clip_paths.
        """

    @property
    def last_usage(self) -> UsageSnapshot | None: ...

    @property
    def last_cost_usd(self) -> float | None: ...

7. sanjaya.tracing β€” Unified Tracing

Single Tracer with generic span methods. No more text/video duplication.

class Tracer:
    """Unified tracer for logfire spans + in-memory SSE events."""

    def __init__(self, track_events: bool = False): ...

    # ── Generic spans (replace all 28 duplicated methods) ──

    @contextmanager
    def completion(self, *, question: str, model: str, **kwargs) -> TraceContext:
        """Top-level agent.ask() span."""

    @contextmanager
    def iteration(self, *, iteration: int, **kwargs) -> TraceContext:
        """One orchestrator loop iteration."""

    @contextmanager
    def orchestrator_call(self, *, model: str, **kwargs) -> TraceContext:
        """Orchestrator LLM call."""

    @contextmanager
    def code_execution(self, *, code: str, **kwargs) -> TraceContext:
        """REPL code block execution."""

    @contextmanager
    def tool_call(self, *, tool_name: str, **kwargs) -> TraceContext:
        """Any tool invocation (generic β€” works for video, custom, etc.)."""

    @contextmanager
    def llm_call(self, *, model: str, prompt: str, **kwargs) -> TraceContext:
        """Sub-LLM call (llm_query, vision_query, etc.)."""

    # ── Events (for SSE) ──

    @property
    def events(self) -> list[dict]:
        """All emitted events, for SSE polling."""

    def emit(self, kind: str, **payload) -> None:
        """Emit a named event."""

Implementation Phases

Phase 1: Foundation β€” Models, Settings, Tool System

Set up the new package structure, core models, and the tool abstraction.

Files to create: - src/sanjaya/answer.py β€” Answer, Evidence - src/sanjaya/tools/__init__.py β€” exports - src/sanjaya/tools/base.py β€” Tool, ToolParam, Toolkit, @tool decorator - src/sanjaya/tools/registry.py β€” ToolRegistry - src/sanjaya/tools/builtins.py β€” context, llm_query, llm_query_batched, done, get_state

Files to modify: - pyproject.toml β€” rename to sanjaya, update metadata - src/sanjaya/__init__.py β€” new exports

Success Criteria:

Automated: - [ ] uv run python -c "from sanjaya import tool, Toolkit, Answer" works - [ ] uv run python -c "from sanjaya.tools import ToolRegistry; r = ToolRegistry(); print(r)" works - [ ] @tool decorator produces a valid Tool from a function with docstring - [ ] ToolRegistry.generate_tool_docs() produces readable tool documentation - [ ] uv run ruff check src/sanjaya/

Manual: - [ ] Tool docs output looks good for 3+ tools with varying signatures


Phase 2: LLM Layer β€” Unified Client with Batching

Merge LLMClient and VideoLLMClient into a single client. Add batched completions.

Files to create: - src/sanjaya/llm/__init__.py - src/sanjaya/llm/client.py β€” unified LLMClient - src/sanjaya/llm/types.py β€” UsageSnapshot, CallMetadata

Files to delete: - src/sanjaya/utils/llm.py (absorbed into llm/client.py) - src/sanjaya/video_llm.py (absorbed into llm/client.py)

Key implementation: - completion_batched uses asyncio.gather over individual Agent.run() calls - vision_completion_batched does the same for vision calls - Keep the fallback_model cascade from current LLMClient - Keep provider-aware API key injection from _ensure_api_key()

Success Criteria:

Automated: - [ ] LLMClient("openrouter:openai/gpt-4.1-mini").completion("hello") returns a string - [ ] completion_batched(["hello", "world"]) returns list[str] of length 2 - [ ] vision_completion(prompt="describe", frame_paths=["test.jpg"]) works - [ ] Fallback model is used when primary fails - [ ] uv run ruff check src/sanjaya/


Phase 3: Core Engine β€” REPL, Loop, Compaction, Budget

Build the single iteration engine that replaces both orchestrator loops.

Files to create: - src/sanjaya/core/__init__.py - src/sanjaya/core/repl.py β€” AgentREPL - src/sanjaya/core/loop.py β€” run_loop, LoopConfig, LoopResult - src/sanjaya/core/compaction.py β€” compact_history - src/sanjaya/core/budget.py β€” BudgetTracker - src/sanjaya/core/blocks.py β€” code extraction, feedback formatting, final answer detection - src/sanjaya/core/prompts.py β€” build_system_prompt

Files to delete: - src/sanjaya/repl.py (absorbed into core/repl.py) - src/sanjaya/rlm_repl.py (absorbed into core/loop.py) - src/sanjaya/rlm.py (no longer needed β€” Agent is the only entry point) - src/sanjaya/models.py (CodeResponse no longer needed) - src/sanjaya/utils/utils.py (absorbed into core/blocks.py) - src/sanjaya/utils/prompts.py (absorbed into core/prompts.py) - src/sanjaya/video_utils.py (absorbed into core/blocks.py) - src/sanjaya/video_prompts.py (absorbed into video toolkit’s prompt_section())

Key implementation: - AgentREPL.execute() calls MontyRepl.feed_run() with external_functions built from the registry - run_loop is the single loop: call orchestrator β†’ extract blocks β†’ execute β†’ check done β†’ optional compaction - build_system_prompt auto-generates tool docs from registry, injects toolkit strategy sections - compact_history asks the sub_model to summarize when messages approach context limit - BudgetTracker accumulates cost/tokens, checked at the top of each iteration

Success Criteria:

Automated: - [ ] AgentREPL executes code with external functions from a ToolRegistry - [ ] run_loop completes when done() is called in REPL code - [ ] run_loop stops at max_iterations and forces a final answer - [ ] BudgetTracker.should_stop() returns True when budget exceeded - [ ] compact_history returns shorter messages when over threshold - [ ] build_system_prompt includes auto-generated tool docs - [ ] uv run ruff check src/sanjaya/

Manual: - [ ] System prompt reads well, tool docs are clear and complete - [ ] Compacted history preserves key intermediate results


Phase 4: Video Toolkit

Port all video tools into the Toolkit abstraction.

Files to create: - src/sanjaya/tools/video/__init__.py - src/sanjaya/tools/video/toolkit.py β€” VideoToolkit - src/sanjaya/tools/video/retrieval.py β€” list_windows - src/sanjaya/tools/video/media.py β€” get_video_info, extract_clip, sample_frames - src/sanjaya/tools/video/vision.py β€” vision_query, vision_query_batched - src/sanjaya/tools/video/transcription.py β€” subtitle resolution - src/sanjaya/tools/video/workspace.py β€” ArtifactWorkspace - src/sanjaya/tools/video/mount.py β€” WorkspaceMount

Files to delete: - src/sanjaya/video_repl.py (logic moved into toolkit tools) - src/sanjaya/video_rlm_repl.py (loop logic now in core/loop.py) - src/sanjaya/video_tools/ (entire directory β€” replaced by tools/video/) - src/sanjaya/video_models.py (video-specific models move into toolkit or answer.py)

Key implementation: - VideoToolkit.__init__ accepts optional RetrievalBackend; defaults to SQLiteFTSBackend(".sanjaya/retrieval.db") - VideoToolkit.setup() resolves subtitles, loads transcript, indexes segments in retrieval backend, creates workspace, builds OSAccess mount - VideoToolkit.tools() returns all 7 Tool objects - VideoToolkit.prompt_section() returns video strategy guidance + formatted transcript - VideoToolkit.build_evidence() converts clip manifest into Evidence items - VideoToolkit.get_state() returns clips, windows, visited ranges, and manifest - Progressive scanning state (_visited_window_ids, _visited_ranges) lives in the toolkit instance - list_windows uses retrieval_backend.search() for subtitle scoring instead of _overlap_score - vision_query_batched uses LLMClient.vision_completion_batched

Success Criteria:

Automated: - [ ] VideoToolkit().tools() returns 7 Tool objects - [ ] Each tool has name, description, parameters, return_type - [ ] from sanjaya.tools.video import VideoToolkit works - [ ] uv run ruff check src/sanjaya/

Manual: - [ ] VideoToolkit.prompt_section() output is clear and instructive - [ ] End-to-end: Agent().use(VideoToolkit()).ask("...", video="test.mp4") produces a VideoAnswer-equivalent Answer


Phase 5: Retrieval Backend + Report Toolkit + Error Handling

Build the shared retrieval layer, report toolkit, and Monty failure handling.

Files to create: - src/sanjaya/retrieval/__init__.py - src/sanjaya/retrieval/base.py β€” RetrievalBackend ABC - src/sanjaya/retrieval/sqlite_fts.py β€” SQLiteFTSBackend - src/sanjaya/retrieval/token_overlap.py β€” TokenOverlapBackend (legacy compat) - src/sanjaya/tools/report/__init__.py - src/sanjaya/tools/report/toolkit.py β€” ReportToolkit - src/sanjaya/tools/report/writers.py β€” save_note, save_qmd, save_data - src/sanjaya/core/errors.py β€” MONTY_HINTS, format_error_with_hints - src/sanjaya/core/patterns.py β€” PatternMemory

Key implementation: - SQLiteFTSBackend uses FTS5 virtual tables with BM25 ranking - ReportToolkit writes files AND indexes content in the retrieval backend - PatternMemory records successful/failed code blocks in a separate SQLite table - format_error_with_hints maps exception types to helpful recovery messages - Pattern memory feeds into build_system_prompt as worked examples / anti-patterns

Success Criteria:

Automated: - [ ] SQLiteFTSBackend(":memory:").index(["hello world"]); .search("hello") returns results - [ ] SQLiteFTSBackend("test.db") persists across instantiations - [ ] ReportToolkit save_note writes file and indexes content - [ ] PatternMemory stores and retrieves successful code patterns - [ ] format_error_with_hints(ModuleNotFoundError(...)) includes helpful hint - [ ] uv run ruff check src/sanjaya/

Manual: - [ ] Cross-run retrieval: index in run 1, search in run 2 finds it - [ ] Pattern memory: failed code in run 1, warning appears in run 2’s prompt


Phase 6: Unified Tracing

Replace the 28-method tracer with a generic one.

Files to create: - src/sanjaya/tracing/__init__.py - src/sanjaya/tracing/tracer.py β€” unified Tracer - src/sanjaya/tracing/events.py β€” event buffer for SSE - src/sanjaya/tracing/observability.py β€” logfire configure (moved from observability.py)

Files to delete: - src/sanjaya/tracing.py (replaced by tracing/tracer.py) - src/sanjaya/observability.py (replaced by tracing/observability.py)

Key implementation: - 6 generic span methods instead of 28 specialized ones - tool_call span automatically captures tool name, args, result, duration - Events are emitted automatically when tracing is enabled - TraceContext stays the same (record, record_usage, record_error, record_llm_cost)

Success Criteria:

Automated: - [ ] Tracer(track_events=True) emits events on span enter/exit - [ ] tracer.events returns list of dicts with kind/timestamp/payload - [ ] All span methods create logfire spans when logfire is configured - [ ] Graceful degradation: works without logfire installed - [ ] uv run ruff check src/sanjaya/


Phase 7: Agent β€” Putting It All Together

Wire everything together in the Agent class.

Files to create: - src/sanjaya/agent.py β€” Agent

Files to modify: - src/sanjaya/__init__.py β€” final exports

Files to delete (cleanup): - src/sanjaya/logger/ (Rich console output moves to tracing or agent) - src/sanjaya/utils/ (empty after prior phases)

Key implementation: - Agent.__init__ creates LLMClient, BudgetTracker, Tracer, ToolRegistry - Agent.use() registers tools/toolkits in the registry - Agent.ask(): 1. If video= provided and no VideoToolkit registered, auto-register one 2. Build context from args (context, video metadata, etc.) 3. Call toolkit.setup(context) for each registered toolkit 4. Build system prompt via core.prompts.build_system_prompt(registry=...) 5. Create AgentREPL with registry and context 6. Run core.loop.run_loop(...) 7. Collect evidence from all toolkits via toolkit.build_evidence() 8. Call toolkit.teardown() for each toolkit 9. Build and return Answer

Success Criteria:

Automated: - [ ] Agent().ask("hello", context="world") returns an Answer - [ ] Agent().use(VideoToolkit()).ask("...", video="test.mp4") returns an Answer with evidence - [ ] Agent().use(custom_tool).ask(...) makes the custom tool available in the REPL - [ ] Budget tracking: agent.cost_so_far reflects actual spend - [ ] agent.reset() clears all state - [ ] uv run ruff check src/sanjaya/

Manual: - [ ] Jupyter notebook workflow works end-to-end - [ ] Video QA quality matches current implementation on sample questions - [ ] Custom tool example works (register a mock tool, agent uses it)


Phase 8: Package Polish + API Server Adapter

Final cleanup: pyproject.toml, README, API server thin adapter.

Files to modify: - pyproject.toml β€” name=sanjaya, clean deps, add [project.optional-dependencies] for video extras - api/pyproject.toml β€” update import paths - api/sanjaya_api/services/orchestrator.py β€” thin adapter to new Agent.ask()

Key implementation: - Core package: uv add sanjaya (no ffmpeg requirement) - Video extras: uv add sanjaya[video] (pulls in ffmpeg-python or notes ffmpeg requirement) - API server adapter: create Agent with tracer, call agent.ask(), poll tracer.events

pyproject.toml:

[project]
name = "sanjaya"
version = "0.2.0"
description = "Extensible RLM agent framework with video understanding"
requires-python = ">=3.12"
dependencies = [
    "pydantic>=2.0",
    "pydantic-ai>=0.2.0",
    "pydantic-monty>=0.0.9",
    "pydantic-settings>=2.0",
    "python-dotenv>=1.0",
    "rich>=14.0",
    "genai-prices>=0.0.56",
]

[project.optional-dependencies]
video = ["logfire"]  # ffmpeg/ffprobe must be on PATH
tracing = ["logfire"]
all = ["sanjaya[video,tracing]"]

Success Criteria:

Automated: - [ ] uv pip install -e . works - [ ] uv pip install -e ".[video]" works - [ ] from sanjaya import Agent, tool, Toolkit, Answer works - [ ] from sanjaya.tools.video import VideoToolkit works - [ ] API server starts and handles a /runs request - [ ] uv run ruff check src/

Manual: - [ ] Full Jupyter notebook demo works (text + video + custom tool) - [ ] API server SSE streaming works with new Agent


Migration Path for API Server

The API server (api/) stays separate but needs a thin adapter:

# api/sanjaya_api/services/orchestrator.py (new version)
from sanjaya import Agent
from sanjaya.tools.video import VideoToolkit
from sanjaya.tracing import Tracer

def _run_completion(record: RunRecord, request: RunRequest):
    tracer = Tracer(track_events=True)
    record.tracer = tracer

    agent = Agent(
        model="openrouter:openai/gpt-5.3-codex",
        max_iterations=request.max_iterations,
        tracing=True,
    )
    agent.use(VideoToolkit(subtitle_mode=request.subtitle_mode))
    # Inject the tracer (agent accepts external tracer for SSE use case)
    agent._tracer = tracer

    record.answer = agent.ask(
        request.question,
        video=request.video_path,
        subtitle=request.subtitle_path,
    )
    record.status = "complete"

Key Design Principles

  1. β€œProgram, don’t prompt” β€” The LLM writes Python code that calls tools. The framework provides the sandbox and tools.

  2. More tools = more RLM, not less β€” Adding tools gives the model more capabilities to program with. The model still decides when, how, and whether to use each tool. What makes it LESS RLM: hardcoding β€œalways call X before Y” in the pipeline. What makes it MORE RLM: giving the model X, Y, and Z and letting it write the strategy.

  3. Don’t hardcode what the model can program β€” If the model can do it with code in the REPL, don’t make it a tool. Tools exist for things the sandbox CAN’T do (shell out to ffmpeg, call LLMs, read toolkit internal state). The model can search a transcript with a list comprehension β€” it doesn’t need a search_transcript tool.

  4. Tools are the extension point β€” Everything domain-specific goes through @tool or Toolkit. The core loop is domain-agnostic.

  5. Auto-generated prompts β€” Tool docstrings become the LLM’s documentation. Good docstrings = good agent behavior.

  6. Zero-setup defaults, power-user upgrades β€” Agent().ask(...) works with zero external deps. SQLite FTS is the default retrieval backend (stdlib, no server). Next-plaid is a documented upgrade path, not a requirement.

  7. Cross-run memory via shared retrieval β€” Reports, analyses, and patterns are indexed on save. Future runs can search past work. The retrieval backend is the shared substrate across toolkits.

  8. Progressive scanning β€” Video toolkit tracks visited regions automatically. The LLM doesn’t need to manage state.

  9. Learn from failures β€” Pattern memory records what code works and fails in Monty. Successful patterns become worked examples in future prompts. Failed patterns become warnings.

  10. Graceful degradation β€” No logfire? Tracing becomes no-ops. No ffmpeg? Core works, video tools error clearly. No API key? Helpful error message.

  11. Budget-aware β€” Every LLM call accumulates cost. The loop checks budget before each iteration. The agent reports total spend.

References