Scheduled Execution: Heartbeats, Cronjobs, and Subagents #35

Closed
opened 2026-02-20 14:38:13 +00:00 by doxios · 5 comments
Collaborator

Summary

Cobot needs mechanisms for scheduled and delegated execution beyond the reactive message loop. This issue describes three related but distinct patterns.


1. Heartbeat

What: Periodic wake-up of the main agent session with full context.

Key properties:

  • Runs in main session (has access to conversation history, memory, SOUL.md)
  • Triggered periodically (e.g., every 15 minutes)
  • Agent reads heartbeat.md (or config) for instructions
  • Returns HEARTBEAT_OK if nothing needs attention
  • Can do proactive work: check inbox, commit workspace, maintenance

OpenClaw equivalent: Gateway cron injects a system event into the main session.

Config example:

heartbeat:
  enabled: true
  interval_minutes: 15
  prompt_file: heartbeat.md
  quiet_hours: "23:00-07:00"

2. Cronjobs

What: Scheduled tasks that run in isolation without prior context.

Key properties:

  • No shared context — each run is a fresh session
  • Scheduled via cron expressions or intervals
  • Can have their own system prompt / task definition
  • Output can be logged, sent somewhere, or trigger actions
  • Useful for: automated reports, periodic checks, background processing

Config example:

cron:
  jobs:
    - name: daily-summary
      schedule: "0 9 * * *"  # 9 AM daily
      prompt: "Generate a summary of yesterday's activity."
      output: telegram  # or: log, file, webhook
    - name: cleanup
      schedule: "0 3 * * 0"  # 3 AM Sundays
      prompt: "Run maintenance tasks."

3. Subagents

What: On-demand spawned sessions for delegated work.

Key properties:

  • Isolated context — only receives explicit task context
  • Spawned programmatically (by main agent or other plugins)
  • Can run in parallel
  • Returns results to spawner
  • Useful for: long-running tasks, parallel research, specialist work

API example:

# From main agent or plugin
result = await registry.spawn_subagent(
    task="Review this PR and post a comment",
    context={"pr_url": "...", "repo": "..."},
    timeout_seconds=300,
)

Tool example (for agent use):

{
  "name": "spawn_subagent",
  "parameters": {
    "task": "Research topic X and summarize",
    "timeout_minutes": 5
  }
}

Relationships

┌─────────────────────────────────────────────────┐
│                  MAIN SESSION                   │
│  (full context, memory, conversation history)   │
│                                                 │
│   ┌─────────┐                                   │
│   │Heartbeat│◄── periodic timer                 │
│   └─────────┘                                   │
│        │                                        │
│        ▼ can spawn                              │
│   ┌─────────┐      ┌─────────┐                  │
│   │Subagent │      │Subagent │  (isolated)      │
│   └─────────┘      └─────────┘                  │
└─────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────┐
│                   CRON JOBS                     │
│  (isolated sessions, no shared context)         │
│                                                 │
│   ┌───────┐  ┌───────┐  ┌───────┐               │
│   │ Job 1 │  │ Job 2 │  │ Job 3 │               │
│   └───────┘  └───────┘  └───────┘               │
└─────────────────────────────────────────────────┘

Open Questions

  1. Heartbeat injection: How does heartbeat inject into the main session? Via communication plugin? Direct agent call?

  2. Subagent lifecycle: How do subagents report back? Callback? Shared queue? Event?

  3. Resource limits: How to prevent runaway subagents/cronjobs? Timeouts, token limits?

  4. Plugin architecture:

    • One plugin per concept? (heartbeat, cron, subagent)
    • Or unified scheduler plugin with different modes?
  5. LLM sharing: Do subagents/cronjobs use the same LLM provider as main? Configurable?

  6. Concurrency: Can multiple subagents run in parallel? Limit?


  • Issue #3 (Logbot: Cron plugin)
  • OpenClaw's gateway cron + sessions_spawn model

Requesting architecture review to propose plugin structure.

## Summary Cobot needs mechanisms for scheduled and delegated execution beyond the reactive message loop. This issue describes three related but distinct patterns. --- ## 1. Heartbeat **What:** Periodic wake-up of the main agent session with full context. **Key properties:** - Runs in **main session** (has access to conversation history, memory, SOUL.md) - Triggered periodically (e.g., every 15 minutes) - Agent reads `heartbeat.md` (or config) for instructions - Returns `HEARTBEAT_OK` if nothing needs attention - Can do proactive work: check inbox, commit workspace, maintenance **OpenClaw equivalent:** Gateway cron injects a system event into the main session. **Config example:** ```yaml heartbeat: enabled: true interval_minutes: 15 prompt_file: heartbeat.md quiet_hours: "23:00-07:00" ``` --- ## 2. Cronjobs **What:** Scheduled tasks that run in isolation without prior context. **Key properties:** - **No shared context** — each run is a fresh session - Scheduled via cron expressions or intervals - Can have their own system prompt / task definition - Output can be logged, sent somewhere, or trigger actions - Useful for: automated reports, periodic checks, background processing **Config example:** ```yaml cron: jobs: - name: daily-summary schedule: "0 9 * * *" # 9 AM daily prompt: "Generate a summary of yesterday's activity." output: telegram # or: log, file, webhook - name: cleanup schedule: "0 3 * * 0" # 3 AM Sundays prompt: "Run maintenance tasks." ``` --- ## 3. Subagents **What:** On-demand spawned sessions for delegated work. **Key properties:** - **Isolated context** — only receives explicit task context - Spawned programmatically (by main agent or other plugins) - Can run in parallel - Returns results to spawner - Useful for: long-running tasks, parallel research, specialist work **API example:** ```python # From main agent or plugin result = await registry.spawn_subagent( task="Review this PR and post a comment", context={"pr_url": "...", "repo": "..."}, timeout_seconds=300, ) ``` **Tool example (for agent use):** ```json { "name": "spawn_subagent", "parameters": { "task": "Research topic X and summarize", "timeout_minutes": 5 } } ``` --- ## Relationships ``` ┌─────────────────────────────────────────────────┐ │ MAIN SESSION │ │ (full context, memory, conversation history) │ │ │ │ ┌─────────┐ │ │ │Heartbeat│◄── periodic timer │ │ └─────────┘ │ │ │ │ │ ▼ can spawn │ │ ┌─────────┐ ┌─────────┐ │ │ │Subagent │ │Subagent │ (isolated) │ │ └─────────┘ └─────────┘ │ └─────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────┐ │ CRON JOBS │ │ (isolated sessions, no shared context) │ │ │ │ ┌───────┐ ┌───────┐ ┌───────┐ │ │ │ Job 1 │ │ Job 2 │ │ Job 3 │ │ │ └───────┘ └───────┘ └───────┘ │ └─────────────────────────────────────────────────┘ ``` --- ## Open Questions 1. **Heartbeat injection:** How does heartbeat inject into the main session? Via communication plugin? Direct agent call? 2. **Subagent lifecycle:** How do subagents report back? Callback? Shared queue? Event? 3. **Resource limits:** How to prevent runaway subagents/cronjobs? Timeouts, token limits? 4. **Plugin architecture:** - One plugin per concept? (`heartbeat`, `cron`, `subagent`) - Or unified scheduler plugin with different modes? 5. **LLM sharing:** Do subagents/cronjobs use the same LLM provider as main? Configurable? 6. **Concurrency:** Can multiple subagents run in parallel? Limit? --- ## Related - Issue #3 (Logbot: Cron plugin) - OpenClaw's gateway cron + sessions_spawn model --- *Requesting architecture review to propose plugin structure.*
Author
Collaborator

Architecture Proposal: Scheduled Execution

After reviewing Cobot's plugin architecture and this issue, here's a detailed implementation proposal.

Design Philosophy

Following Cobot's principles: minimal, composable, self-sovereign. Each concept is a separate plugin, not a monolithic scheduler. Plugins can work independently or together.


Plugin Structure

cobot/plugins/
├── heartbeat/          # Phase 1
│   ├── __init__.py
│   └── plugin.py
├── cron/               # Phase 2
│   ├── __init__.py
│   └── plugin.py
└── subagent/           # Phase 3
    ├── __init__.py
    └── plugin.py

Phase 1: Heartbeat Plugin (MVP)

Goal: Periodic wake-up of main agent with full context.

Approach

Heartbeat works within the main session by injecting a synthetic message into the communication flow. This preserves full context (history, memory, SOUL.md) since the agent's respond() method handles it like any other message.

Plugin Design

class HeartbeatPlugin(Plugin):
    meta = PluginMeta(
        id="heartbeat",
        version="1.0.0",
        dependencies=["config", "communication"],
        implements={
            "session.receive": "inject_heartbeat",  # Inject into message stream
        },
        priority=35,  # After channels (30)
    )

Key insight: By implementing session.receive, heartbeat can inject messages into the normal flow. The session plugin aggregates all session.receive implementations.

Injection Method

def inject_heartbeat(self) -> list[IncomingMessage]:
    """Return heartbeat message if it's time."""
    if not self._should_trigger():
        return []
    
    # Read heartbeat instructions
    prompt = self._load_prompt()
    
    return [IncomingMessage(
        id=f"heartbeat-{int(time.time())}",
        channel_type="heartbeat",
        channel_id="system",
        sender_id="system",
        sender_name="Heartbeat",
        content=prompt,
        timestamp=datetime.now(),
        metadata={"is_heartbeat": True},
    )]

Config Schema

heartbeat:
  enabled: true
  interval_minutes: 15
  prompt_file: heartbeat.md      # Instructions for heartbeat
  quiet_hours: "23:00-07:00"     # Optional: Don't trigger during these hours
  response_channel: null          # Where to send response (null = discard)

Hook: Suppress Persistence

Heartbeat responses shouldn't clutter conversation history. Use a hook:

async def on_message_received(self, ctx: dict) -> dict:
    """Mark heartbeat messages to skip persistence."""
    if ctx.get("metadata", {}).get("is_heartbeat"):
        ctx["skip_persistence"] = True
    return ctx

Persistence plugin checks ctx["skip_persistence"] before saving.

Example heartbeat.md

This is a heartbeat check. Do useful maintenance work:

1. Check filedrop inbox for new messages
2. If you have uncommitted workspace changes, commit them
3. Review and update memory if needed

If nothing needs attention, respond with: HEARTBEAT_OK

Phase 2: Cron Plugin

Goal: Scheduled tasks in isolated sessions (no shared context).

Approach

Cronjobs need a separate mini-agent that runs in isolation. This is different from heartbeat—no history injection, no memory context.

Plugin Design

class CronPlugin(Plugin):
    meta = PluginMeta(
        id="cron",
        version="1.0.0",
        dependencies=["config"],
        extension_points=["cron.job_complete"],  # For output routing
        priority=45,
    )

    async def start(self) -> None:
        """Start cron scheduler as background task."""
        self._task = asyncio.create_task(self._run_scheduler())

Job Runner (Isolated)

async def _run_job(self, job: CronJob) -> str:
    """Run a single job in isolation."""
    llm = self._registry.get_by_capability("llm")
    
    # Minimal messages - just system prompt and job prompt
    messages = [
        {"role": "system", "content": job.system_prompt or "You are a scheduled task runner."},
        {"role": "user", "content": job.prompt},
    ]
    
    # No hooks, no history, no persistence
    response = llm.chat(messages, tools=None)  # Tools optional per-job config
    
    # Route output
    await self._route_output(job, response.content)
    
    return response.content

Config Schema

cron:
  enabled: true
  jobs:
    - name: daily-summary
      schedule: "0 9 * * *"       # cron expression
      prompt: "Generate a summary of yesterday's activity."
      system_prompt: null         # Optional override
      tools_enabled: false        # Isolated by default
      output:
        type: telegram            # telegram | log | file | webhook | filedrop
        target: "-1001234567890"  # Channel/file path/URL
    
    - name: cleanup
      schedule: "0 3 * * 0"
      prompt: "Run maintenance: clean old logs, check disk space."
      output:
        type: log

Output Routing

async def _route_output(self, job: CronJob, content: str) -> None:
    """Route job output to configured destination."""
    output_cfg = job.output
    
    if output_cfg["type"] == "telegram":
        comm = self._registry.get("communication")
        comm.send(OutgoingMessage(
            channel_type="telegram",
            channel_id=output_cfg["target"],
            content=f"**[Cron: {job.name}]**\n{content}",
        ))
    elif output_cfg["type"] == "log":
        print(f"[Cron] {job.name}: {content}", file=sys.stderr)
    elif output_cfg["type"] == "file":
        Path(output_cfg["target"]).write_text(content)
    elif output_cfg["type"] == "filedrop":
        filedrop = self._registry.get("filedrop")
        filedrop.send(output_cfg["target"], content)
    # ...

Cron Expression Parsing

Use the croniter library (add to dependencies):

from croniter import croniter

def _should_run(self, job: CronJob, now: datetime) -> bool:
    cron = croniter(job.schedule, now - timedelta(minutes=1))
    next_run = cron.get_next(datetime)
    return next_run <= now

Phase 3: Subagent Plugin

Goal: On-demand spawned sessions for delegated work.

Approach

Subagents are like cronjobs but triggered programmatically instead of scheduled. They run in isolation with explicit context passed to them.

Plugin Design

class SubagentPlugin(Plugin, ToolProvider):
    meta = PluginMeta(
        id="subagent",
        version="1.0.0",
        capabilities=["tools"],  # Provides tools (partial)
        dependencies=["config"],
        extension_points=["subagent.spawned", "subagent.complete"],
        priority=32,  # After main tools (30)
    )

API Method

async def spawn(
    self,
    task: str,
    context: dict = None,
    system_prompt: str = None,
    timeout_seconds: int = 300,
    tools_enabled: bool = False,
) -> SubagentResult:
    """Spawn an isolated subagent.
    
    Args:
        task: Task description for the subagent
        context: Dict of context data to include
        system_prompt: Override system prompt
        timeout_seconds: Kill subagent after this time
        tools_enabled: Whether subagent can use tools
    
    Returns:
        SubagentResult with output and metadata
    """

Tool Definition (for main agent)

The main agent can spawn subagents via tool:

SUBAGENT_TOOL = {
    "type": "function",
    "function": {
        "name": "spawn_subagent",
        "description": "Spawn an isolated subagent for a delegated task. Use for long-running or parallel work.",
        "parameters": {
            "type": "object",
            "properties": {
                "task": {
                    "type": "string",
                    "description": "Task description for the subagent",
                },
                "context": {
                    "type": "string", 
                    "description": "Context data (JSON string)",
                },
                "timeout_minutes": {
                    "type": "integer",
                    "description": "Timeout in minutes (default 5)",
                },
            },
            "required": ["task"],
        },
    },
}

Implementation

async def _spawn_subagent(
    self, task: str, context: str = None, timeout_minutes: int = 5
) -> str:
    """Execute subagent tool."""
    llm = self._registry.get_by_capability("llm")
    
    context_str = ""
    if context:
        try:
            ctx_data = json.loads(context)
            context_str = f"\n\n**Context:**\n```json\n{json.dumps(ctx_data, indent=2)}\n```"
        except json.JSONDecodeError:
            context_str = f"\n\n**Context:**\n{context}"
    
    prompt = f"""You are a subagent spawned for a specific task.

**Task:** {task}
{context_str}

Complete this task and report your findings. Be concise but thorough.
"""
    
    messages = [
        {"role": "system", "content": "You are a subagent. Complete your task efficiently."},
        {"role": "user", "content": prompt},
    ]
    
    try:
        response = await asyncio.wait_for(
            asyncio.to_thread(llm.chat, messages),
            timeout=timeout_minutes * 60,
        )
        return f"**Subagent result:**\n{response.content}"
    except asyncio.TimeoutError:
        return f"Error: Subagent timed out after {timeout_minutes} minutes"

Concurrency Limits

subagent:
  enabled: true
  max_concurrent: 3          # Max parallel subagents
  max_tokens_per_task: 4000  # Token budget per subagent
  default_timeout_minutes: 5
  allow_tools: false         # Default: subagents can't use tools

Integration with agent.py

Minimal changes needed to agent.py:

1. Agent Reference for Plugins

Add method to set agent reference (like set_registry):

# In agent.py after registry init
heartbeat = registry.get("heartbeat")
if heartbeat and hasattr(heartbeat, "set_agent"):
    heartbeat.set_agent(self)

2. Hook: Skip Persistence

In persistence plugin:

async def on_message_received(self, ctx: dict) -> dict:
    if ctx.get("skip_persistence"):
        self._enabled = False  # Temporarily disable
    return ctx

async def on_after_send(self, ctx: dict) -> dict:
    if ctx.get("skip_persistence"):
        self._enabled = True   # Re-enable
        return ctx
    # ... normal persistence logic

New Interface: ScheduledTask

For consistency, define a shared interface:

# interfaces.py addition
@dataclass
class ScheduledTask:
    """Configuration for a scheduled task."""
    name: str
    prompt: str
    schedule: Optional[str] = None  # Cron expression (None = on-demand)
    system_prompt: Optional[str] = None
    tools_enabled: bool = False
    timeout_seconds: int = 300
    output: Optional[dict] = None  # Output routing config

Implementation Phases

Phase 1: Heartbeat (MVP) - ~2-3 days

  • Create heartbeat/plugin.py
  • Implement session.receive extension
  • Add quiet hours logic
  • Add skip_persistence flag
  • Update persistence plugin to respect flag
  • Tests and documentation

Config additions:

heartbeat:
  enabled: false
  interval_minutes: 15

Phase 2: Cronjobs - ~3-4 days

  • Add croniter dependency
  • Create cron/plugin.py
  • Implement job runner (isolated sessions)
  • Implement output routing
  • Tests for scheduling logic
  • Document job configuration

Config additions:

cron:
  enabled: false
  jobs: []

Phase 3: Subagents - ~2-3 days

  • Create subagent/plugin.py
  • Add tool definition
  • Implement spawn API
  • Add concurrency limiting
  • Tests
  • Document usage patterns

Config additions:

subagent:
  enabled: false
  max_concurrent: 3

Tradeoffs & Concerns

1. Resource Usage

  • Each subagent/cronjob = LLM call = cost
  • Mitigation: Configurable limits, token budgets, concurrency caps

2. Heartbeat Injection

  • Injecting via session.receive is elegant but requires the communication plugin to aggregate properly
  • Alternative: Direct call to agent.respond() but requires agent reference

3. Cronjob Isolation

  • Jobs don't see main context, which is desired but could be limiting
  • Mitigation: Allow passing explicit context in job config

4. Subagent Tool Recursion

  • Can a subagent spawn subagents? Probably shouldn't.
  • Mitigation: Check nesting level, disallow by default

5. Error Handling

  • What happens if heartbeat/cron fails?
  • Mitigation: Use existing on_error hook, add retry logic

Open Questions for Discussion

  1. Heartbeat response routing: Should heartbeat responses go anywhere? Options:

    • Discard (just log)
    • FileDrop to self
    • Configurable channel
  2. Cron job tools: Should cronjobs have access to tools? Risk vs utility?

  3. Subagent result delivery: Sync (await) vs async (callback/filedrop)?


Summary

Concept Plugin Injection Context Phase
Heartbeat heartbeat session.receive Full (main session) 1
Cronjobs cron Background task Isolated 2
Subagents subagent Tool/API Explicit only 3

This approach is composable (use any subset), minimal (each plugin is small), and extensible (extension points for customization).

Ready to start on Phase 1?

# Architecture Proposal: Scheduled Execution After reviewing Cobot's plugin architecture and this issue, here's a detailed implementation proposal. ## Design Philosophy Following Cobot's principles: **minimal, composable, self-sovereign**. Each concept is a separate plugin, not a monolithic scheduler. Plugins can work independently or together. --- ## Plugin Structure ``` cobot/plugins/ ├── heartbeat/ # Phase 1 │ ├── __init__.py │ └── plugin.py ├── cron/ # Phase 2 │ ├── __init__.py │ └── plugin.py └── subagent/ # Phase 3 ├── __init__.py └── plugin.py ``` --- ## Phase 1: Heartbeat Plugin (MVP) **Goal:** Periodic wake-up of main agent with full context. ### Approach Heartbeat works **within** the main session by injecting a synthetic message into the communication flow. This preserves full context (history, memory, SOUL.md) since the agent's `respond()` method handles it like any other message. ### Plugin Design ```python class HeartbeatPlugin(Plugin): meta = PluginMeta( id="heartbeat", version="1.0.0", dependencies=["config", "communication"], implements={ "session.receive": "inject_heartbeat", # Inject into message stream }, priority=35, # After channels (30) ) ``` **Key insight:** By implementing `session.receive`, heartbeat can inject messages into the normal flow. The session plugin aggregates all `session.receive` implementations. ### Injection Method ```python def inject_heartbeat(self) -> list[IncomingMessage]: """Return heartbeat message if it's time.""" if not self._should_trigger(): return [] # Read heartbeat instructions prompt = self._load_prompt() return [IncomingMessage( id=f"heartbeat-{int(time.time())}", channel_type="heartbeat", channel_id="system", sender_id="system", sender_name="Heartbeat", content=prompt, timestamp=datetime.now(), metadata={"is_heartbeat": True}, )] ``` ### Config Schema ```yaml heartbeat: enabled: true interval_minutes: 15 prompt_file: heartbeat.md # Instructions for heartbeat quiet_hours: "23:00-07:00" # Optional: Don't trigger during these hours response_channel: null # Where to send response (null = discard) ``` ### Hook: Suppress Persistence Heartbeat responses shouldn't clutter conversation history. Use a hook: ```python async def on_message_received(self, ctx: dict) -> dict: """Mark heartbeat messages to skip persistence.""" if ctx.get("metadata", {}).get("is_heartbeat"): ctx["skip_persistence"] = True return ctx ``` Persistence plugin checks `ctx["skip_persistence"]` before saving. ### Example heartbeat.md ```markdown This is a heartbeat check. Do useful maintenance work: 1. Check filedrop inbox for new messages 2. If you have uncommitted workspace changes, commit them 3. Review and update memory if needed If nothing needs attention, respond with: HEARTBEAT_OK ``` --- ## Phase 2: Cron Plugin **Goal:** Scheduled tasks in isolated sessions (no shared context). ### Approach Cronjobs need a **separate mini-agent** that runs in isolation. This is different from heartbeat—no history injection, no memory context. ### Plugin Design ```python class CronPlugin(Plugin): meta = PluginMeta( id="cron", version="1.0.0", dependencies=["config"], extension_points=["cron.job_complete"], # For output routing priority=45, ) async def start(self) -> None: """Start cron scheduler as background task.""" self._task = asyncio.create_task(self._run_scheduler()) ``` ### Job Runner (Isolated) ```python async def _run_job(self, job: CronJob) -> str: """Run a single job in isolation.""" llm = self._registry.get_by_capability("llm") # Minimal messages - just system prompt and job prompt messages = [ {"role": "system", "content": job.system_prompt or "You are a scheduled task runner."}, {"role": "user", "content": job.prompt}, ] # No hooks, no history, no persistence response = llm.chat(messages, tools=None) # Tools optional per-job config # Route output await self._route_output(job, response.content) return response.content ``` ### Config Schema ```yaml cron: enabled: true jobs: - name: daily-summary schedule: "0 9 * * *" # cron expression prompt: "Generate a summary of yesterday's activity." system_prompt: null # Optional override tools_enabled: false # Isolated by default output: type: telegram # telegram | log | file | webhook | filedrop target: "-1001234567890" # Channel/file path/URL - name: cleanup schedule: "0 3 * * 0" prompt: "Run maintenance: clean old logs, check disk space." output: type: log ``` ### Output Routing ```python async def _route_output(self, job: CronJob, content: str) -> None: """Route job output to configured destination.""" output_cfg = job.output if output_cfg["type"] == "telegram": comm = self._registry.get("communication") comm.send(OutgoingMessage( channel_type="telegram", channel_id=output_cfg["target"], content=f"**[Cron: {job.name}]**\n{content}", )) elif output_cfg["type"] == "log": print(f"[Cron] {job.name}: {content}", file=sys.stderr) elif output_cfg["type"] == "file": Path(output_cfg["target"]).write_text(content) elif output_cfg["type"] == "filedrop": filedrop = self._registry.get("filedrop") filedrop.send(output_cfg["target"], content) # ... ``` ### Cron Expression Parsing Use the `croniter` library (add to dependencies): ```python from croniter import croniter def _should_run(self, job: CronJob, now: datetime) -> bool: cron = croniter(job.schedule, now - timedelta(minutes=1)) next_run = cron.get_next(datetime) return next_run <= now ``` --- ## Phase 3: Subagent Plugin **Goal:** On-demand spawned sessions for delegated work. ### Approach Subagents are like cronjobs but triggered programmatically instead of scheduled. They run in isolation with explicit context passed to them. ### Plugin Design ```python class SubagentPlugin(Plugin, ToolProvider): meta = PluginMeta( id="subagent", version="1.0.0", capabilities=["tools"], # Provides tools (partial) dependencies=["config"], extension_points=["subagent.spawned", "subagent.complete"], priority=32, # After main tools (30) ) ``` ### API Method ```python async def spawn( self, task: str, context: dict = None, system_prompt: str = None, timeout_seconds: int = 300, tools_enabled: bool = False, ) -> SubagentResult: """Spawn an isolated subagent. Args: task: Task description for the subagent context: Dict of context data to include system_prompt: Override system prompt timeout_seconds: Kill subagent after this time tools_enabled: Whether subagent can use tools Returns: SubagentResult with output and metadata """ ``` ### Tool Definition (for main agent) The main agent can spawn subagents via tool: ```python SUBAGENT_TOOL = { "type": "function", "function": { "name": "spawn_subagent", "description": "Spawn an isolated subagent for a delegated task. Use for long-running or parallel work.", "parameters": { "type": "object", "properties": { "task": { "type": "string", "description": "Task description for the subagent", }, "context": { "type": "string", "description": "Context data (JSON string)", }, "timeout_minutes": { "type": "integer", "description": "Timeout in minutes (default 5)", }, }, "required": ["task"], }, }, } ``` ### Implementation ```python async def _spawn_subagent( self, task: str, context: str = None, timeout_minutes: int = 5 ) -> str: """Execute subagent tool.""" llm = self._registry.get_by_capability("llm") context_str = "" if context: try: ctx_data = json.loads(context) context_str = f"\n\n**Context:**\n```json\n{json.dumps(ctx_data, indent=2)}\n```" except json.JSONDecodeError: context_str = f"\n\n**Context:**\n{context}" prompt = f"""You are a subagent spawned for a specific task. **Task:** {task} {context_str} Complete this task and report your findings. Be concise but thorough. """ messages = [ {"role": "system", "content": "You are a subagent. Complete your task efficiently."}, {"role": "user", "content": prompt}, ] try: response = await asyncio.wait_for( asyncio.to_thread(llm.chat, messages), timeout=timeout_minutes * 60, ) return f"**Subagent result:**\n{response.content}" except asyncio.TimeoutError: return f"Error: Subagent timed out after {timeout_minutes} minutes" ``` ### Concurrency Limits ```yaml subagent: enabled: true max_concurrent: 3 # Max parallel subagents max_tokens_per_task: 4000 # Token budget per subagent default_timeout_minutes: 5 allow_tools: false # Default: subagents can't use tools ``` --- ## Integration with agent.py Minimal changes needed to `agent.py`: ### 1. Agent Reference for Plugins Add method to set agent reference (like `set_registry`): ```python # In agent.py after registry init heartbeat = registry.get("heartbeat") if heartbeat and hasattr(heartbeat, "set_agent"): heartbeat.set_agent(self) ``` ### 2. Hook: Skip Persistence In persistence plugin: ```python async def on_message_received(self, ctx: dict) -> dict: if ctx.get("skip_persistence"): self._enabled = False # Temporarily disable return ctx async def on_after_send(self, ctx: dict) -> dict: if ctx.get("skip_persistence"): self._enabled = True # Re-enable return ctx # ... normal persistence logic ``` --- ## New Interface: ScheduledTask For consistency, define a shared interface: ```python # interfaces.py addition @dataclass class ScheduledTask: """Configuration for a scheduled task.""" name: str prompt: str schedule: Optional[str] = None # Cron expression (None = on-demand) system_prompt: Optional[str] = None tools_enabled: bool = False timeout_seconds: int = 300 output: Optional[dict] = None # Output routing config ``` --- ## Implementation Phases ### Phase 1: Heartbeat (MVP) - ~2-3 days - [ ] Create `heartbeat/plugin.py` - [ ] Implement `session.receive` extension - [ ] Add quiet hours logic - [ ] Add `skip_persistence` flag - [ ] Update persistence plugin to respect flag - [ ] Tests and documentation **Config additions:** ```yaml heartbeat: enabled: false interval_minutes: 15 ``` ### Phase 2: Cronjobs - ~3-4 days - [ ] Add `croniter` dependency - [ ] Create `cron/plugin.py` - [ ] Implement job runner (isolated sessions) - [ ] Implement output routing - [ ] Tests for scheduling logic - [ ] Document job configuration **Config additions:** ```yaml cron: enabled: false jobs: [] ``` ### Phase 3: Subagents - ~2-3 days - [ ] Create `subagent/plugin.py` - [ ] Add tool definition - [ ] Implement spawn API - [ ] Add concurrency limiting - [ ] Tests - [ ] Document usage patterns **Config additions:** ```yaml subagent: enabled: false max_concurrent: 3 ``` --- ## Tradeoffs & Concerns ### 1. Resource Usage - Each subagent/cronjob = LLM call = cost - Mitigation: Configurable limits, token budgets, concurrency caps ### 2. Heartbeat Injection - Injecting via `session.receive` is elegant but requires the communication plugin to aggregate properly - Alternative: Direct call to `agent.respond()` but requires agent reference ### 3. Cronjob Isolation - Jobs don't see main context, which is desired but could be limiting - Mitigation: Allow passing explicit context in job config ### 4. Subagent Tool Recursion - Can a subagent spawn subagents? Probably shouldn't. - Mitigation: Check nesting level, disallow by default ### 5. Error Handling - What happens if heartbeat/cron fails? - Mitigation: Use existing `on_error` hook, add retry logic --- ## Open Questions for Discussion 1. **Heartbeat response routing:** Should heartbeat responses go anywhere? Options: - Discard (just log) - FileDrop to self - Configurable channel 2. **Cron job tools:** Should cronjobs have access to tools? Risk vs utility? 3. **Subagent result delivery:** Sync (await) vs async (callback/filedrop)? --- ## Summary | Concept | Plugin | Injection | Context | Phase | |---------|--------|-----------|---------|-------| | Heartbeat | `heartbeat` | `session.receive` | Full (main session) | 1 | | Cronjobs | `cron` | Background task | Isolated | 2 | | Subagents | `subagent` | Tool/API | Explicit only | 3 | This approach is **composable** (use any subset), **minimal** (each plugin is small), and **extensible** (extension points for customization). Ready to start on Phase 1?
Owner

Several questions:

  • Why do we implement subagents last even though we need them for cron? If heartbeat is a special version of cronjob, why not building cron before heartbeat.
    Also heartbeat is mentioned in the agent.py. This is a nogo! No references of plugins in other code or plugins which is not extending that plugin!
Several questions: * Why do we implement subagents last even though we need them for cron? If heartbeat is a special version of cronjob, why not building cron before heartbeat. Also heartbeat is mentioned in the agent.py. This is a nogo! No references of plugins in other code or plugins which is not extending that plugin!
Author
Collaborator

Valid concerns! Let me address:

1. Implementation Order

You're right — if cronjobs need isolated sessions (subagents), then subagents should come before cron, not after.

Revised order:

  1. Subagents — isolated execution mechanism
  2. Cron — scheduler that spawns subagents for isolated jobs
  3. Heartbeat — special cron job that runs in main session (not isolated)

2. Heartbeat as Special Cron

Heartbeat could be implemented as:

cron:
  jobs:
    - name: heartbeat
      schedule: "*/15 * * * *"
      mode: main_session  # <-- key difference: runs in main context
      prompt_file: heartbeat.md

So cron plugin handles both:

  • mode: isolated → spawn subagent (no context)
  • mode: main_session → inject into main session (full context = heartbeat)

3. No Plugin References in agent.py

Agreed — this violates plugin architecture. Heartbeat/cron should not be referenced in agent.py.

How to inject into main session without touching agent.py:

Option A: Communication plugin extension

  • Cron plugin registers a synthetic message source
  • Communication plugin aggregates from all sources (including cron)
  • Agent polls as normal, sees heartbeat as a message

Option B: New extension point

  • Add scheduler.on_tick extension point in registry
  • Cron plugin implements it
  • Registry calls it during the main loop (registry already has the loop hook)

Option A keeps everything as "messages" which is cleaner.


Revised architecture:

Phase 1: Subagent plugin
  - Isolated session spawning
  - Tool: spawn_subagent
  - API: registry.spawn_subagent(task, context)

Phase 2: Cron plugin  
  - Scheduler with cron expressions
  - mode: isolated → uses subagent
  - mode: main_session → injects via communication
  - Heartbeat = cron job with mode: main_session

No changes to agent.py needed. Cron uses existing communication aggregation for main_session mode, and subagent API for isolated mode.

Does this address the concerns?

Valid concerns! Let me address: ## 1. Implementation Order You're right — if cronjobs need isolated sessions (subagents), then subagents should come **before** cron, not after. **Revised order:** 1. **Subagents** — isolated execution mechanism 2. **Cron** — scheduler that spawns subagents for isolated jobs 3. **Heartbeat** — special cron job that runs in main session (not isolated) ## 2. Heartbeat as Special Cron Heartbeat could be implemented as: ```yaml cron: jobs: - name: heartbeat schedule: "*/15 * * * *" mode: main_session # <-- key difference: runs in main context prompt_file: heartbeat.md ``` So `cron` plugin handles both: - `mode: isolated` → spawn subagent (no context) - `mode: main_session` → inject into main session (full context = heartbeat) ## 3. No Plugin References in agent.py ❌ Agreed — this violates plugin architecture. Heartbeat/cron should **not** be referenced in agent.py. **How to inject into main session without touching agent.py:** Option A: **Communication plugin extension** - Cron plugin registers a synthetic message source - Communication plugin aggregates from all sources (including cron) - Agent polls as normal, sees heartbeat as a message Option B: **New extension point** - Add `scheduler.on_tick` extension point in registry - Cron plugin implements it - Registry calls it during the main loop (registry already has the loop hook) Option A keeps everything as "messages" which is cleaner. --- **Revised architecture:** ``` Phase 1: Subagent plugin - Isolated session spawning - Tool: spawn_subagent - API: registry.spawn_subagent(task, context) Phase 2: Cron plugin - Scheduler with cron expressions - mode: isolated → uses subagent - mode: main_session → injects via communication - Heartbeat = cron job with mode: main_session ``` No changes to agent.py needed. Cron uses existing communication aggregation for main_session mode, and subagent API for isolated mode. Does this address the concerns?
Author
Collaborator

📋 Revised Architecture Proposal (v2)

Based on feedback, here's the corrected architecture.


Implementation Order

Phase Plugin Depends On Purpose
1 subagent Isolated session execution
2 cron subagent Scheduled jobs (isolated or main session)
3 heartbeat cron Convenience wrapper for periodic main-session wake-up

Phase 1: Subagent Plugin

Purpose: Spawn isolated sessions for delegated work.

cobot/plugins/subagent/
├── __init__.py
├── plugin.py      # SubagentProvider interface
└── README.md

Interface:

class SubagentProvider(ABC):
    @abstractmethod
    async def spawn(self, task: str, context: dict = None, timeout_seconds: int = 300) -> dict:
        """Spawn isolated session, return result."""
        pass

Config:

subagent:
  max_concurrent: 3
  default_timeout_seconds: 300
  # Uses same LLM provider as main agent

Tool (for agent use):

{
  "name": "spawn_subagent",
  "parameters": {
    "task": "Research X and summarize",
    "timeout_minutes": 5
  }
}

Phase 2: Cron Plugin

Purpose: Schedule jobs — either isolated (via subagent) or in main session.

cobot/plugins/cron/
├── __init__.py
├── plugin.py      # Scheduler, job management
├── parser.py      # Cron expression parsing
└── README.md

Config:

cron:
  jobs:
    - name: daily-report
      schedule: "0 9 * * *"
      mode: isolated          # Spawns subagent
      prompt: "Generate daily summary"
      
    - name: heartbeat
      schedule: "*/15 * * * *"
      mode: main_session      # Injects into main session
      prompt_file: HEARTBEAT.md
      quiet_hours: "23:00-07:00"

Modes:

  • isolated — Spawns subagent via SubagentProvider (no context)
  • main_session — Injects via communication plugin (full context)

Main session injection (no agent.py changes):

# Cron plugin registers as a message source
comm = registry.get("communication")
comm.register_source("cron", self._poll_due_jobs)

# When job is due with mode=main_session:
def _create_synthetic_message(self, job):
    return IncomingMessage(
        channel_type="internal",
        channel_id="cron",
        content=self._load_prompt(job),
        sender_name="system",
        id=f"cron-{job.name}-{timestamp}"
    )

Phase 3: Heartbeat Plugin

Purpose: Convenience wrapper for the common "periodic main-session wake-up" pattern.

cobot/plugins/heartbeat/
├── __init__.py
├── plugin.py
└── README.md

Config (simple):

heartbeat:
  enabled: true
  interval_minutes: 15
  # prompt_file: HEARTBEAT.md  (default)
  # quiet_hours: "23:00-07:00" (optional)

Implementation:

class HeartbeatPlugin(Plugin):
    meta = PluginMeta(
        id="heartbeat",
        version="1.0.0",
        dependencies=["cron"],
        priority=35,
    )

    async def start(self) -> None:
        if not self._enabled:
            return
        
        cron = self.registry.get("cron")
        cron.add_job({
            "name": "__heartbeat__",
            "schedule": f"*/{self._interval} * * * *",
            "mode": "main_session",
            "prompt_file": self._prompt_file,
            "quiet_hours": self._quiet_hours,
        })

HEARTBEAT.md convention:

## Heartbeat Tasks

1. Check filedrop inbox
2. Review pending items
3. Commit workspace changes

Reply HEARTBEAT_OK if nothing needs attention.

Key Principles

  1. No agent.py modifications — Everything via plugins and existing hooks
  2. Communication aggregation — Cron injects via comm.register_source()
  3. Subagent isolation — Fresh session, no conversation history
  4. Heartbeat = cron job — Just a convenience wrapper with sensible defaults
  5. Composable — Use any subset: just subagent, just cron, or full stack

Dependency Graph

                    ┌──────────────┐
                    │  heartbeat   │  (convenience)
                    │  depends on  │
                    └──────┬───────┘
                           │
                    ┌──────▼───────┐
                    │     cron     │  (scheduler)
                    │  depends on  │
                    └──────┬───────┘
                           │
                    ┌──────▼───────┐
                    │   subagent   │  (isolated execution)
                    │              │
                    └──────────────┘

Open Questions

  1. Communication source registration — Does the communication plugin support register_source() or do we need to add it?

  2. Subagent LLM — Same provider as main, or configurable per-job?

  3. Quiet hours — Skip entirely, or queue for later?

  4. Heartbeat response handling — Log? Discard? Send somewhere?

## 📋 Revised Architecture Proposal (v2) Based on feedback, here's the corrected architecture. --- ### Implementation Order | Phase | Plugin | Depends On | Purpose | |-------|--------|------------|----------| | 1 | `subagent` | — | Isolated session execution | | 2 | `cron` | `subagent` | Scheduled jobs (isolated or main session) | | 3 | `heartbeat` | `cron` | Convenience wrapper for periodic main-session wake-up | --- ### Phase 1: Subagent Plugin **Purpose:** Spawn isolated sessions for delegated work. ``` cobot/plugins/subagent/ ├── __init__.py ├── plugin.py # SubagentProvider interface └── README.md ``` **Interface:** ```python class SubagentProvider(ABC): @abstractmethod async def spawn(self, task: str, context: dict = None, timeout_seconds: int = 300) -> dict: """Spawn isolated session, return result.""" pass ``` **Config:** ```yaml subagent: max_concurrent: 3 default_timeout_seconds: 300 # Uses same LLM provider as main agent ``` **Tool (for agent use):** ```json { "name": "spawn_subagent", "parameters": { "task": "Research X and summarize", "timeout_minutes": 5 } } ``` --- ### Phase 2: Cron Plugin **Purpose:** Schedule jobs — either isolated (via subagent) or in main session. ``` cobot/plugins/cron/ ├── __init__.py ├── plugin.py # Scheduler, job management ├── parser.py # Cron expression parsing └── README.md ``` **Config:** ```yaml cron: jobs: - name: daily-report schedule: "0 9 * * *" mode: isolated # Spawns subagent prompt: "Generate daily summary" - name: heartbeat schedule: "*/15 * * * *" mode: main_session # Injects into main session prompt_file: HEARTBEAT.md quiet_hours: "23:00-07:00" ``` **Modes:** - `isolated` — Spawns subagent via SubagentProvider (no context) - `main_session` — Injects via communication plugin (full context) **Main session injection (no agent.py changes):** ```python # Cron plugin registers as a message source comm = registry.get("communication") comm.register_source("cron", self._poll_due_jobs) # When job is due with mode=main_session: def _create_synthetic_message(self, job): return IncomingMessage( channel_type="internal", channel_id="cron", content=self._load_prompt(job), sender_name="system", id=f"cron-{job.name}-{timestamp}" ) ``` --- ### Phase 3: Heartbeat Plugin **Purpose:** Convenience wrapper for the common "periodic main-session wake-up" pattern. ``` cobot/plugins/heartbeat/ ├── __init__.py ├── plugin.py └── README.md ``` **Config (simple):** ```yaml heartbeat: enabled: true interval_minutes: 15 # prompt_file: HEARTBEAT.md (default) # quiet_hours: "23:00-07:00" (optional) ``` **Implementation:** ```python class HeartbeatPlugin(Plugin): meta = PluginMeta( id="heartbeat", version="1.0.0", dependencies=["cron"], priority=35, ) async def start(self) -> None: if not self._enabled: return cron = self.registry.get("cron") cron.add_job({ "name": "__heartbeat__", "schedule": f"*/{self._interval} * * * *", "mode": "main_session", "prompt_file": self._prompt_file, "quiet_hours": self._quiet_hours, }) ``` **HEARTBEAT.md convention:** ```markdown ## Heartbeat Tasks 1. Check filedrop inbox 2. Review pending items 3. Commit workspace changes Reply HEARTBEAT_OK if nothing needs attention. ``` --- ### Key Principles 1. **No agent.py modifications** — Everything via plugins and existing hooks 2. **Communication aggregation** — Cron injects via `comm.register_source()` 3. **Subagent isolation** — Fresh session, no conversation history 4. **Heartbeat = cron job** — Just a convenience wrapper with sensible defaults 5. **Composable** — Use any subset: just subagent, just cron, or full stack --- ### Dependency Graph ``` ┌──────────────┐ │ heartbeat │ (convenience) │ depends on │ └──────┬───────┘ │ ┌──────▼───────┐ │ cron │ (scheduler) │ depends on │ └──────┬───────┘ │ ┌──────▼───────┐ │ subagent │ (isolated execution) │ │ └──────────────┘ ``` --- ### Open Questions 1. **Communication source registration** — Does the communication plugin support `register_source()` or do we need to add it? 2. **Subagent LLM** — Same provider as main, or configurable per-job? 3. **Quiet hours** — Skip entirely, or queue for later? 4. **Heartbeat response handling** — Log? Discard? Send somewhere?
Owner

I think the heartbeat should be enabled by default.
The cronjob plugin should have some logging: Whether it's activated at all and which cronjobs does it have configured.

I think the heartbeat should be enabled by default. The cronjob plugin should have some logging: Whether it's activated at all and which cronjobs does it have configured.
k9ert closed this issue 2026-02-22 10:17:13 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
2 participants
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
ultanio/cobot#35
No description provided.