Skip to content

Runtime Guide

The Runtime is the execution engine that brings a Society graph to life. You define agents, edges, and artifacts declaratively; the runtime compiles the graph, seeds initial events, drains the event queue in parallel batches, and returns a structured result.

Claw ships with LocalRuntime, a single-process async engine suitable for development, testing, and moderate-scale production use. The Runtime protocol is open for extension (e.g., distributed runtimes).

Execution Model

Every runtime.run() call follows the same four-phase loop:

Compile --> Seed --> Drain --> Return

1. Compile

The society graph is validated and compiled. Compilation checks that all edges reference valid agents, that artifacts are well-formed, and that the graph is structurally sound. Per-agent system prompts, context blocks, and available tool definitions are generated from the graph topology.

# Internally, runtime calls:
from claw.compiler.compile import compile as compile_society
compile_society(society)

If validation fails, the runtime raises before any LLM calls are made.

2. Seed

The runtime identifies entry agents -- agents that should receive the initial task. Entry detection works as follows:

  • If the graph has Delegation edges, the delegator (source of the outermost Delegation) is the entry point.
  • If there are no Delegation edges, all agents receive the initial task.

Each entry agent gets a task_assigned event pushed onto the EventBus:

Event(
    type="task_assigned",
    source="system",
    target=agent.name,
    edge_id=edges[0].edge_id,
    data={"task": task},
)

3. Drain

The drain loop is where agents actually execute. On each iteration:

  1. Check termination conditions (budget, timeout, all edges resolved, empty queue).
  2. Pop an independent batch from the EventBus -- events whose target agents can safely execute in parallel.
  3. Execute all agents in the batch concurrently via asyncio.gather.
  4. Process results: apply artifact writes, push new events back onto the bus, track edge resolution.
  5. Repeat until a termination condition is met.
while not terminated:
    batch = bus.pop_independent_batch()
    results = await asyncio.gather(*[execute(event) for event in batch])
    for result in results:
        apply_artifact_writes(result)
        push_new_events(result)

4. Return

Once the loop terminates, the runtime builds a SocietyResult containing the full event trace, final artifact snapshots, and execution statistics.


LocalRuntime

LocalRuntime is the primary runtime implementation. It runs everything in a single Python process using asyncio for concurrency.

Basic Usage

from claw import LocalRuntime, MockLLM

llm = MockLLM()
runtime = LocalRuntime(llm)
result = await runtime.run(society, "Do the task")

Constructor Parameters

Parameter Type Default Description
llm LLM required LLM backend for agent execution (e.g., MockLLM, LiteLLMBackend).
observer RuntimeObserver \| None None Lifecycle observer for monitoring events, agent starts/ends, and termination.
tool_executor ToolExecutor \| None None Enables real tool execution in the ReAct loop. When None, all tool calls are converted to events.
max_tool_rounds int 10 Maximum iterations of the ReAct loop per agent turn. Prevents infinite tool-calling loops.

The run() Method

async def run(
    self,
    society: Society,
    task: str,
    artifacts: list[ArtifactProtocol] | None = None,
) -> SocietyResult:

Arguments: - society -- The society graph to execute. - task -- A string describing the task to accomplish. - artifacts -- Optional list of pre-existing artifacts to register in the store before execution begins.

Returns: A SocietyResult (see below).

Pre-registering Artifacts

If your society works with artifacts that already have content (e.g., an existing document to review), pass them via the artifacts parameter:

from claw import StringArtifact, LocalRuntime, MockLLM

doc = StringArtifact("design-doc")
doc.write("# Design\nInitial draft...", author="setup")

runtime = LocalRuntime(MockLLM())
result = await runtime.run(society, "Review the design doc", artifacts=[doc])

SocietyResult

Every runtime.run() call returns a SocietyResult dataclass with full execution details.

result = await runtime.run(society, "task")

print(result.status)           # "completed", "budget_exceeded", "timed_out", "deadlocked"
print(result.termination)      # TerminationReason enum value
print(result.rounds)           # Number of drain loop iterations
print(result.total_llm_calls)  # Total LLM API calls across all agents
print(result.trace)            # List[Event] -- full ordered event trace
print(result.artifacts)        # Dict[str, str] -- artifact name to content snapshot

Fields

Field Type Description
status str Human-readable status: "completed", "budget_exceeded", "timed_out", "deadlocked".
termination TerminationReason Enum value indicating why the run ended.
artifacts dict[str, str] Final content snapshot of every registered artifact.
trace list[Event] Every event that flowed through the system, in order.
rounds int How many drain loop iterations occurred.
total_llm_calls int Total number of LLM API calls made across all agents and ReAct rounds.
irreversible_actions list[IrreversibleAction] External side effects (e.g., GitHub comments posted) that cannot be rolled back.

Status Mapping

The status string is derived from the termination reason:

TerminationReason status
ALL_EDGES_RESOLVED "completed"
QUEUE_EMPTY "completed"
EDGE_RESOLVED "completed"
BUDGET_EXCEEDED "budget_exceeded"
TIMEOUT "timed_out"
DEADLOCK "deadlocked"

Event Flow

Events are the unit of communication between agents. Unlike frameworks that broadcast globally, Claw routes events along edges.

Edge-Routed Delivery

When an agent emits an event, it travels along the edge connecting the source and target agents. The runtime automatically resolves the correct edge:

  • If agent A emits an event targeting agent B, the runtime looks up the edge between A and B and sets the edge_id accordingly.
  • For group edges (Cooperation, Competition, Coopetition), the runtime checks group membership.
  • This means agents on different edges never accidentally see each other's events.

Independent Batch Detection

The EventBus detects which events can execute in parallel. Two events are independent if and only if:

  1. Their target agents share no edges in the society graph, AND
  2. Their target agents have no overlapping artifact write sets.

The bus uses a greedy algorithm (first-come priority) to build the maximal independent batch on each drain iteration. Events that conflict with the batch are held for the next round.

# Conceptually:
batch = bus.pop_independent_batch()
# batch contains events whose targets can safely execute concurrently

This means that in a society with independent sub-graphs (e.g., two separate review pipelines), agents in different sub-graphs execute in parallel automatically.

Event Structure

Every event carries:

Event(
    type="comment",           # Event type identifier
    source="reviewer",        # Agent that produced the event
    target="coder",           # Agent that should receive the event
    edge_id="edge-abc-123",   # Edge this event travels along
    data={"content": "..."},  # Structured payload
    timestamp=...,            # Auto-set creation time
    sequence_id=...,          # Monotonic counter for ordering
)

Termination Conditions

The runtime checks termination conditions at the start of every drain loop iteration. The run ends as soon as any condition is met.

Reason Enum Value Meaning
All edges resolved ALL_EDGES_RESOLVED Every edge in the society has been terminated by a resolution event (approve, reject, complete, submit, agreement, etc.).
Queue empty QUEUE_EMPTY No more events to process. Agents have stopped emitting new work.
Budget exceeded BUDGET_EXCEEDED Total LLM calls reached the max_llm_calls limit set in SocietyConfig.
Timeout TIMEOUT Wall-clock time exceeded max_wall_time from SocietyConfig.
Deadlock DEADLOCK The system detected a deadlock (events exist but cannot be processed).

Configuring Limits

Termination limits are set on the society's configuration:

from datetime import timedelta
from claw import Society, SocietyConfig

society = Society(
    "my-society",
    config=SocietyConfig(
        max_llm_calls=50,                          # Default: 100
        max_wall_time=timedelta(minutes=10),        # Default: 30 minutes
    ),
)

Edge Resolution

Different edge types are resolved by different events:

Edge Type Resolution Events
Oversight approve, reject
Delegation accept, reject, complete
Cooperation complete (all agents must signal)
Competition submit (all competitors must submit)
Coopetition agreement, complete

The EdgeResolutionTracker tracks per-edge state. For Cooperation and Competition edges, resolution requires all participating agents to signal -- a single agent signaling complete is not enough.


Tool Execution (ReAct Loop)

By default, LLM tool calls are converted into events. When you provide a ToolExecutor, the runtime enters a ReAct loop that executes real tools:

LLM call --> tool calls? --> execute tools --> feed results back --> LLM call --> repeat

The loop continues until the agent emits a Claw action (like emit_event or write_artifact) or hits the max_tool_rounds limit.

Setting Up Tool Execution

from pathlib import Path
from claw import (
    LocalRuntime,
    MockLLM,
    ToolExecutor,
    FileEditTool,
    ShellExecTool,
)

executor = ToolExecutor(
    file_tool=FileEditTool(workdir=Path("./repo")),
    shell_tool=ShellExecTool(
        workdir=Path("./repo"),
        command_allowlist=["pytest", "ruff"],
    ),
    dry_run=False,
)

runtime = LocalRuntime(
    MockLLM(),
    tool_executor=executor,
    max_tool_rounds=5,
)
result = await runtime.run(society, "Fix the failing tests")

Tool Call Classification

When the runtime receives tool calls from the LLM, it classifies them into two categories:

Executable tools -- dispatched to the ToolExecutor, results fed back to the LLM for the next ReAct iteration:

Tool Name Description
file_edit Read, write, or modify files via FileEditTool.
shell_exec Run shell commands via ShellExecTool.
github Interact with GitHub (PRs, comments, etc.) via GitHubTool.

Claw actions -- parsed into events or artifact writes, which exit the ReAct loop and flow back into the society:

Action Effect
emit_event Creates a new Event on the EventBus.
write_artifact Creates a WriteAction applied to the ArtifactStore.

This classification means an agent can perform multiple rounds of file editing and test running before finally emitting an event to notify another agent.

Without a ToolExecutor

When no ToolExecutor is configured (the default), the runtime makes a single LLM call per agent turn. All tool calls in the response -- including file_edit and shell_exec -- are parsed as events. This is useful for simulation and testing where you do not need real side effects.


Dry Run

Use dry run mode to see what tools the agents would call without actually executing them:

from claw import ToolExecutor, FileEditTool

executor = ToolExecutor(
    file_tool=FileEditTool(workdir=Path("./repo")),
    dry_run=True,
)
# Tools log actions but don't execute them
# Output will show: [DRY RUN] file_edit({...})

In dry run mode, every tool call returns a successful ToolResult with a log message showing what would have been executed. The LLM still sees a "success" response, so the ReAct loop proceeds normally -- the agent just cannot observe real tool output.


Observer Pattern

The RuntimeObserver protocol lets you hook into runtime lifecycle events for monitoring, logging, or building dashboards.

Implementing an Observer

from claw import RuntimeObserver, TerminationReason
from claw.event import Event


class MyObserver(RuntimeObserver):
    def on_event(self, event: Event) -> None:
        print(f"Event: {event.type} from {event.source} to {event.target}")

    def on_agent_start(self, agent_name: str, event: Event) -> None:
        print(f"Agent {agent_name} starting on {event.type}")

    def on_agent_end(self, agent_name: str, events: list[Event]) -> None:
        print(f"Agent {agent_name} done, emitted {len(events)} events")

    def on_terminate(self, reason: TerminationReason) -> None:
        print(f"Run terminated: {reason}")

Attaching an Observer

from claw import LocalRuntime, MockLLM

runtime = LocalRuntime(MockLLM(), observer=MyObserver())
result = await runtime.run(society, "Do the task")

Observer Callbacks

Callback When Called Arguments
on_event(event) Every time an event is pushed or popped from the bus. The Event object.
on_agent_start(name, event) Before an agent begins processing an event. Agent name and the triggering event.
on_agent_end(name, events) After an agent finishes processing. Agent name and list of events the agent emitted.
on_terminate(reason) When the run ends. The TerminationReason.

Claw also ships with WebSocketObserver for the live dashboard (see the Dashboard Guide).


Persistence (EventLogger)

The EventLogger writes every event to an append-only JSONL file for later replay, debugging, or audit.

Logging a Run

from claw import EventLogger

logger = EventLogger(run_id="run-001", base_dir=".claw/runs")

# Log events as they occur (e.g., inside an observer):
logger.log(event)

# Log artifact snapshots:
logger.log_artifact_snapshot(name="design-doc", version=3, content="...")

# Log termination:
logger.log_termination(reason)

Each entry is a JSON object on its own line. The file is written to .claw/runs/{run_id}.jsonl.

Replaying a Run

from claw import load_run

entries = load_run("run-001", base_dir=".claw/runs")
for entry in entries:
    if entry["entry_type"] == "event":
        print(f"{entry['source']} -> {entry['target']}: {entry['type']}")
    elif entry["entry_type"] == "artifact_snapshot":
        print(f"Artifact {entry['artifact_name']} v{entry['version']}")
    elif entry["entry_type"] == "termination":
        print(f"Terminated: {entry['reason']}")

Entry Types

The JSONL log contains three kinds of entries:

entry_type Fields Description
"event" type, source, target, edge_id, data, timestamp, sequence_id A society event.
"artifact_snapshot" artifact_name, version, content, timestamp A point-in-time snapshot of artifact content.
"termination" reason, timestamp Why the run ended.

Integrating with the Observer

A common pattern is to wire the EventLogger into a RuntimeObserver:

from claw import EventLogger, RuntimeObserver, TerminationReason
from claw.event import Event


class LoggingObserver(RuntimeObserver):
    def __init__(self, run_id: str) -> None:
        self._logger = EventLogger(run_id)

    def on_event(self, event: Event) -> None:
        self._logger.log(event)

    def on_agent_start(self, agent_name: str, event: Event) -> None:
        pass  # Optional: log agent starts

    def on_agent_end(self, agent_name: str, events: list[Event]) -> None:
        pass  # Optional: log agent ends

    def on_terminate(self, reason: TerminationReason) -> None:
        self._logger.log_termination(reason)
runtime = LocalRuntime(llm, observer=LoggingObserver("run-001"))

Watchdog (Agent Timeouts)

The Watchdog monitors per-agent execution time and cancels turns that exceed their timeout.

from datetime import timedelta
from claw import Watchdog

watchdog = Watchdog(default_timeout=timedelta(seconds=60))

# Execute a coroutine with a timeout:
result, timed_out = await watchdog.execute_with_timeout(
    coro=some_agent_coroutine(),
    agent_name="coder",
    edge_timeout=timedelta(seconds=30),  # Optional per-edge override
)

if timed_out:
    print("Agent coder timed out!")

The global per-turn timeout defaults to 120 seconds and can be overridden per-edge. Timed-out agents are tracked and can be queried:

watchdog.timed_out_agents  # ["coder"] if coder timed out

LLM Retry Behavior

The LocalRuntime includes app-level retry logic for malformed LLM outputs. If the LLM returns an empty response (no content and no tool calls), the runtime retries up to 3 times with exponential backoff (1s, 2s, 4s).

HTTP-level retries (rate limits, transient errors) are handled by the LLM backend itself (e.g., LiteLLMBackend delegates to litellm's built-in retry logic).


Full Example

Putting it all together -- a society with tool execution, an observer, and persistence:

import asyncio
from pathlib import Path

from claw import (
    Agent,
    Delegation,
    EventLogger,
    FileEditTool,
    LocalRuntime,
    MockLLM,
    Oversight,
    RuntimeObserver,
    ShellExecTool,
    Society,
    SocietyConfig,
    StringArtifact,
    TerminationReason,
    ToolExecutor,
)
from claw.event import Event
from datetime import timedelta


# 1. Build the society
society = Society(
    "code-fix",
    config=SocietyConfig(max_llm_calls=30, max_wall_time=timedelta(minutes=5)),
)

lead = Agent("lead", role="tech lead", model="claude-sonnet")
coder = Agent("coder", role="implementer", model="claude-sonnet")
reviewer = Agent("reviewer", role="code reviewer", model="claude-sonnet")

society.add(lead, coder, reviewer)
society.connect(lead, coder, Delegation())
society.connect(coder, reviewer, Oversight(max_rounds=2))


# 2. Set up observer with logging
class PrintObserver(RuntimeObserver):
    def __init__(self, run_id: str) -> None:
        self._logger = EventLogger(run_id)

    def on_event(self, event: Event) -> None:
        self._logger.log(event)
        print(f"  [{event.source} -> {event.target}] {event.type}")

    def on_agent_start(self, name: str, event: Event) -> None:
        print(f">>> {name} processing {event.type}")

    def on_agent_end(self, name: str, events: list[Event]) -> None:
        print(f"<<< {name} emitted {len(events)} events")

    def on_terminate(self, reason: TerminationReason) -> None:
        self._logger.log_termination(reason)
        print(f"=== Terminated: {reason}")


# 3. Configure tools and runtime
executor = ToolExecutor(
    file_tool=FileEditTool(workdir=Path("./repo")),
    shell_tool=ShellExecTool(
        workdir=Path("./repo"),
        command_allowlist=["pytest", "ruff"],
    ),
)

llm = MockLLM()
runtime = LocalRuntime(
    llm,
    observer=PrintObserver("run-001"),
    tool_executor=executor,
    max_tool_rounds=5,
)

# 4. Run
result = asyncio.run(runtime.run(society, "Fix the failing test in test_parser.py"))

# 5. Inspect results
print(f"Status: {result.status}")
print(f"Rounds: {result.rounds}")
print(f"LLM calls: {result.total_llm_calls}")
print(f"Events: {len(result.trace)}")
for name, content in result.artifacts.items():
    print(f"Artifact '{name}': {len(content)} chars")