Runtime Guide¶
The Runtime is the execution engine that brings a Society graph to life. You define agents, edges, and artifacts declaratively; the runtime compiles the graph, seeds initial events, drains the event queue in parallel batches, and returns a structured result.
Claw ships with LocalRuntime, a single-process async engine suitable for development, testing, and moderate-scale production use. The Runtime protocol is open for extension (e.g., distributed runtimes).
Execution Model¶
Every runtime.run() call follows the same four-phase loop:
1. Compile¶
The society graph is validated and compiled. Compilation checks that all edges reference valid agents, that artifacts are well-formed, and that the graph is structurally sound. Per-agent system prompts, context blocks, and available tool definitions are generated from the graph topology.
# Internally, runtime calls:
from claw.compiler.compile import compile as compile_society
compile_society(society)
If validation fails, the runtime raises before any LLM calls are made.
2. Seed¶
The runtime identifies entry agents -- agents that should receive the initial task. Entry detection works as follows:
- If the graph has Delegation edges, the delegator (source of the outermost Delegation) is the entry point.
- If there are no Delegation edges, all agents receive the initial task.
Each entry agent gets a task_assigned event pushed onto the EventBus:
Event(
type="task_assigned",
source="system",
target=agent.name,
edge_id=edges[0].edge_id,
data={"task": task},
)
3. Drain¶
The drain loop is where agents actually execute. On each iteration:
- Check termination conditions (budget, timeout, all edges resolved, empty queue).
- Pop an independent batch from the EventBus -- events whose target agents can safely execute in parallel.
- Execute all agents in the batch concurrently via
asyncio.gather. - Process results: apply artifact writes, push new events back onto the bus, track edge resolution.
- Repeat until a termination condition is met.
while not terminated:
batch = bus.pop_independent_batch()
results = await asyncio.gather(*[execute(event) for event in batch])
for result in results:
apply_artifact_writes(result)
push_new_events(result)
4. Return¶
Once the loop terminates, the runtime builds a SocietyResult containing the full event trace, final artifact snapshots, and execution statistics.
LocalRuntime¶
LocalRuntime is the primary runtime implementation. It runs everything in a single Python process using asyncio for concurrency.
Basic Usage¶
from claw import LocalRuntime, MockLLM
llm = MockLLM()
runtime = LocalRuntime(llm)
result = await runtime.run(society, "Do the task")
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
llm |
LLM |
required | LLM backend for agent execution (e.g., MockLLM, LiteLLMBackend). |
observer |
RuntimeObserver \| None |
None |
Lifecycle observer for monitoring events, agent starts/ends, and termination. |
tool_executor |
ToolExecutor \| None |
None |
Enables real tool execution in the ReAct loop. When None, all tool calls are converted to events. |
max_tool_rounds |
int |
10 |
Maximum iterations of the ReAct loop per agent turn. Prevents infinite tool-calling loops. |
The run() Method¶
async def run(
self,
society: Society,
task: str,
artifacts: list[ArtifactProtocol] | None = None,
) -> SocietyResult:
Arguments:
- society -- The society graph to execute.
- task -- A string describing the task to accomplish.
- artifacts -- Optional list of pre-existing artifacts to register in the store before execution begins.
Returns: A SocietyResult (see below).
Pre-registering Artifacts¶
If your society works with artifacts that already have content (e.g., an existing document to review), pass them via the artifacts parameter:
from claw import StringArtifact, LocalRuntime, MockLLM
doc = StringArtifact("design-doc")
doc.write("# Design\nInitial draft...", author="setup")
runtime = LocalRuntime(MockLLM())
result = await runtime.run(society, "Review the design doc", artifacts=[doc])
SocietyResult¶
Every runtime.run() call returns a SocietyResult dataclass with full execution details.
result = await runtime.run(society, "task")
print(result.status) # "completed", "budget_exceeded", "timed_out", "deadlocked"
print(result.termination) # TerminationReason enum value
print(result.rounds) # Number of drain loop iterations
print(result.total_llm_calls) # Total LLM API calls across all agents
print(result.trace) # List[Event] -- full ordered event trace
print(result.artifacts) # Dict[str, str] -- artifact name to content snapshot
Fields¶
| Field | Type | Description |
|---|---|---|
status |
str |
Human-readable status: "completed", "budget_exceeded", "timed_out", "deadlocked". |
termination |
TerminationReason |
Enum value indicating why the run ended. |
artifacts |
dict[str, str] |
Final content snapshot of every registered artifact. |
trace |
list[Event] |
Every event that flowed through the system, in order. |
rounds |
int |
How many drain loop iterations occurred. |
total_llm_calls |
int |
Total number of LLM API calls made across all agents and ReAct rounds. |
irreversible_actions |
list[IrreversibleAction] |
External side effects (e.g., GitHub comments posted) that cannot be rolled back. |
Status Mapping¶
The status string is derived from the termination reason:
TerminationReason |
status |
|---|---|
ALL_EDGES_RESOLVED |
"completed" |
QUEUE_EMPTY |
"completed" |
EDGE_RESOLVED |
"completed" |
BUDGET_EXCEEDED |
"budget_exceeded" |
TIMEOUT |
"timed_out" |
DEADLOCK |
"deadlocked" |
Event Flow¶
Events are the unit of communication between agents. Unlike frameworks that broadcast globally, Claw routes events along edges.
Edge-Routed Delivery¶
When an agent emits an event, it travels along the edge connecting the source and target agents. The runtime automatically resolves the correct edge:
- If agent A emits an event targeting agent B, the runtime looks up the edge between A and B and sets the
edge_idaccordingly. - For group edges (Cooperation, Competition, Coopetition), the runtime checks group membership.
- This means agents on different edges never accidentally see each other's events.
Independent Batch Detection¶
The EventBus detects which events can execute in parallel. Two events are independent if and only if:
- Their target agents share no edges in the society graph, AND
- Their target agents have no overlapping artifact write sets.
The bus uses a greedy algorithm (first-come priority) to build the maximal independent batch on each drain iteration. Events that conflict with the batch are held for the next round.
# Conceptually:
batch = bus.pop_independent_batch()
# batch contains events whose targets can safely execute concurrently
This means that in a society with independent sub-graphs (e.g., two separate review pipelines), agents in different sub-graphs execute in parallel automatically.
Event Structure¶
Every event carries:
Event(
type="comment", # Event type identifier
source="reviewer", # Agent that produced the event
target="coder", # Agent that should receive the event
edge_id="edge-abc-123", # Edge this event travels along
data={"content": "..."}, # Structured payload
timestamp=..., # Auto-set creation time
sequence_id=..., # Monotonic counter for ordering
)
Termination Conditions¶
The runtime checks termination conditions at the start of every drain loop iteration. The run ends as soon as any condition is met.
| Reason | Enum Value | Meaning |
|---|---|---|
| All edges resolved | ALL_EDGES_RESOLVED |
Every edge in the society has been terminated by a resolution event (approve, reject, complete, submit, agreement, etc.). |
| Queue empty | QUEUE_EMPTY |
No more events to process. Agents have stopped emitting new work. |
| Budget exceeded | BUDGET_EXCEEDED |
Total LLM calls reached the max_llm_calls limit set in SocietyConfig. |
| Timeout | TIMEOUT |
Wall-clock time exceeded max_wall_time from SocietyConfig. |
| Deadlock | DEADLOCK |
The system detected a deadlock (events exist but cannot be processed). |
Configuring Limits¶
Termination limits are set on the society's configuration:
from datetime import timedelta
from claw import Society, SocietyConfig
society = Society(
"my-society",
config=SocietyConfig(
max_llm_calls=50, # Default: 100
max_wall_time=timedelta(minutes=10), # Default: 30 minutes
),
)
Edge Resolution¶
Different edge types are resolved by different events:
| Edge Type | Resolution Events |
|---|---|
| Oversight | approve, reject |
| Delegation | accept, reject, complete |
| Cooperation | complete (all agents must signal) |
| Competition | submit (all competitors must submit) |
| Coopetition | agreement, complete |
The EdgeResolutionTracker tracks per-edge state. For Cooperation and Competition edges, resolution requires all participating agents to signal -- a single agent signaling complete is not enough.
Tool Execution (ReAct Loop)¶
By default, LLM tool calls are converted into events. When you provide a ToolExecutor, the runtime enters a ReAct loop that executes real tools:
The loop continues until the agent emits a Claw action (like emit_event or write_artifact) or hits the max_tool_rounds limit.
Setting Up Tool Execution¶
from pathlib import Path
from claw import (
LocalRuntime,
MockLLM,
ToolExecutor,
FileEditTool,
ShellExecTool,
)
executor = ToolExecutor(
file_tool=FileEditTool(workdir=Path("./repo")),
shell_tool=ShellExecTool(
workdir=Path("./repo"),
command_allowlist=["pytest", "ruff"],
),
dry_run=False,
)
runtime = LocalRuntime(
MockLLM(),
tool_executor=executor,
max_tool_rounds=5,
)
result = await runtime.run(society, "Fix the failing tests")
Tool Call Classification¶
When the runtime receives tool calls from the LLM, it classifies them into two categories:
Executable tools -- dispatched to the ToolExecutor, results fed back to the LLM for the next ReAct iteration:
| Tool Name | Description |
|---|---|
file_edit |
Read, write, or modify files via FileEditTool. |
shell_exec |
Run shell commands via ShellExecTool. |
github |
Interact with GitHub (PRs, comments, etc.) via GitHubTool. |
Claw actions -- parsed into events or artifact writes, which exit the ReAct loop and flow back into the society:
| Action | Effect |
|---|---|
emit_event |
Creates a new Event on the EventBus. |
write_artifact |
Creates a WriteAction applied to the ArtifactStore. |
This classification means an agent can perform multiple rounds of file editing and test running before finally emitting an event to notify another agent.
Without a ToolExecutor¶
When no ToolExecutor is configured (the default), the runtime makes a single LLM call per agent turn. All tool calls in the response -- including file_edit and shell_exec -- are parsed as events. This is useful for simulation and testing where you do not need real side effects.
Dry Run¶
Use dry run mode to see what tools the agents would call without actually executing them:
from claw import ToolExecutor, FileEditTool
executor = ToolExecutor(
file_tool=FileEditTool(workdir=Path("./repo")),
dry_run=True,
)
# Tools log actions but don't execute them
# Output will show: [DRY RUN] file_edit({...})
In dry run mode, every tool call returns a successful ToolResult with a log message showing what would have been executed. The LLM still sees a "success" response, so the ReAct loop proceeds normally -- the agent just cannot observe real tool output.
Observer Pattern¶
The RuntimeObserver protocol lets you hook into runtime lifecycle events for monitoring, logging, or building dashboards.
Implementing an Observer¶
from claw import RuntimeObserver, TerminationReason
from claw.event import Event
class MyObserver(RuntimeObserver):
def on_event(self, event: Event) -> None:
print(f"Event: {event.type} from {event.source} to {event.target}")
def on_agent_start(self, agent_name: str, event: Event) -> None:
print(f"Agent {agent_name} starting on {event.type}")
def on_agent_end(self, agent_name: str, events: list[Event]) -> None:
print(f"Agent {agent_name} done, emitted {len(events)} events")
def on_terminate(self, reason: TerminationReason) -> None:
print(f"Run terminated: {reason}")
Attaching an Observer¶
from claw import LocalRuntime, MockLLM
runtime = LocalRuntime(MockLLM(), observer=MyObserver())
result = await runtime.run(society, "Do the task")
Observer Callbacks¶
| Callback | When Called | Arguments |
|---|---|---|
on_event(event) |
Every time an event is pushed or popped from the bus. | The Event object. |
on_agent_start(name, event) |
Before an agent begins processing an event. | Agent name and the triggering event. |
on_agent_end(name, events) |
After an agent finishes processing. | Agent name and list of events the agent emitted. |
on_terminate(reason) |
When the run ends. | The TerminationReason. |
Claw also ships with WebSocketObserver for the live dashboard (see the Dashboard Guide).
Persistence (EventLogger)¶
The EventLogger writes every event to an append-only JSONL file for later replay, debugging, or audit.
Logging a Run¶
from claw import EventLogger
logger = EventLogger(run_id="run-001", base_dir=".claw/runs")
# Log events as they occur (e.g., inside an observer):
logger.log(event)
# Log artifact snapshots:
logger.log_artifact_snapshot(name="design-doc", version=3, content="...")
# Log termination:
logger.log_termination(reason)
Each entry is a JSON object on its own line. The file is written to .claw/runs/{run_id}.jsonl.
Replaying a Run¶
from claw import load_run
entries = load_run("run-001", base_dir=".claw/runs")
for entry in entries:
if entry["entry_type"] == "event":
print(f"{entry['source']} -> {entry['target']}: {entry['type']}")
elif entry["entry_type"] == "artifact_snapshot":
print(f"Artifact {entry['artifact_name']} v{entry['version']}")
elif entry["entry_type"] == "termination":
print(f"Terminated: {entry['reason']}")
Entry Types¶
The JSONL log contains three kinds of entries:
entry_type |
Fields | Description |
|---|---|---|
"event" |
type, source, target, edge_id, data, timestamp, sequence_id |
A society event. |
"artifact_snapshot" |
artifact_name, version, content, timestamp |
A point-in-time snapshot of artifact content. |
"termination" |
reason, timestamp |
Why the run ended. |
Integrating with the Observer¶
A common pattern is to wire the EventLogger into a RuntimeObserver:
from claw import EventLogger, RuntimeObserver, TerminationReason
from claw.event import Event
class LoggingObserver(RuntimeObserver):
def __init__(self, run_id: str) -> None:
self._logger = EventLogger(run_id)
def on_event(self, event: Event) -> None:
self._logger.log(event)
def on_agent_start(self, agent_name: str, event: Event) -> None:
pass # Optional: log agent starts
def on_agent_end(self, agent_name: str, events: list[Event]) -> None:
pass # Optional: log agent ends
def on_terminate(self, reason: TerminationReason) -> None:
self._logger.log_termination(reason)
Watchdog (Agent Timeouts)¶
The Watchdog monitors per-agent execution time and cancels turns that exceed their timeout.
from datetime import timedelta
from claw import Watchdog
watchdog = Watchdog(default_timeout=timedelta(seconds=60))
# Execute a coroutine with a timeout:
result, timed_out = await watchdog.execute_with_timeout(
coro=some_agent_coroutine(),
agent_name="coder",
edge_timeout=timedelta(seconds=30), # Optional per-edge override
)
if timed_out:
print("Agent coder timed out!")
The global per-turn timeout defaults to 120 seconds and can be overridden per-edge. Timed-out agents are tracked and can be queried:
LLM Retry Behavior¶
The LocalRuntime includes app-level retry logic for malformed LLM outputs. If the LLM returns an empty response (no content and no tool calls), the runtime retries up to 3 times with exponential backoff (1s, 2s, 4s).
HTTP-level retries (rate limits, transient errors) are handled by the LLM backend itself (e.g., LiteLLMBackend delegates to litellm's built-in retry logic).
Full Example¶
Putting it all together -- a society with tool execution, an observer, and persistence:
import asyncio
from pathlib import Path
from claw import (
Agent,
Delegation,
EventLogger,
FileEditTool,
LocalRuntime,
MockLLM,
Oversight,
RuntimeObserver,
ShellExecTool,
Society,
SocietyConfig,
StringArtifact,
TerminationReason,
ToolExecutor,
)
from claw.event import Event
from datetime import timedelta
# 1. Build the society
society = Society(
"code-fix",
config=SocietyConfig(max_llm_calls=30, max_wall_time=timedelta(minutes=5)),
)
lead = Agent("lead", role="tech lead", model="claude-sonnet")
coder = Agent("coder", role="implementer", model="claude-sonnet")
reviewer = Agent("reviewer", role="code reviewer", model="claude-sonnet")
society.add(lead, coder, reviewer)
society.connect(lead, coder, Delegation())
society.connect(coder, reviewer, Oversight(max_rounds=2))
# 2. Set up observer with logging
class PrintObserver(RuntimeObserver):
def __init__(self, run_id: str) -> None:
self._logger = EventLogger(run_id)
def on_event(self, event: Event) -> None:
self._logger.log(event)
print(f" [{event.source} -> {event.target}] {event.type}")
def on_agent_start(self, name: str, event: Event) -> None:
print(f">>> {name} processing {event.type}")
def on_agent_end(self, name: str, events: list[Event]) -> None:
print(f"<<< {name} emitted {len(events)} events")
def on_terminate(self, reason: TerminationReason) -> None:
self._logger.log_termination(reason)
print(f"=== Terminated: {reason}")
# 3. Configure tools and runtime
executor = ToolExecutor(
file_tool=FileEditTool(workdir=Path("./repo")),
shell_tool=ShellExecTool(
workdir=Path("./repo"),
command_allowlist=["pytest", "ruff"],
),
)
llm = MockLLM()
runtime = LocalRuntime(
llm,
observer=PrintObserver("run-001"),
tool_executor=executor,
max_tool_rounds=5,
)
# 4. Run
result = asyncio.run(runtime.run(society, "Fix the failing test in test_parser.py"))
# 5. Inspect results
print(f"Status: {result.status}")
print(f"Rounds: {result.rounds}")
print(f"LLM calls: {result.total_llm_calls}")
print(f"Events: {len(result.trace)}")
for name, content in result.artifacts.items():
print(f"Artifact '{name}': {len(content)} chars")