Skip to content

Runtime

Core Runtime

base

Runtime protocols and result types.

Defines the Runtime protocol, SocietyResult, TerminationReason, RuntimeObserver, IrreversibleAction, and a minimal LLM protocol that Phase 5 will expand.

TerminationReason

Bases: StrEnum

Why a society run terminated.

IrreversibleAction(type, agent, timestamp=datetime.now(), details=dict()) dataclass

Record of an external side effect that cannot be rolled back.

Attributes:

Name Type Description
type str

Category (e.g. "github_comment_posted", "commit_pushed").

agent str

Name of the agent that performed the action.

timestamp datetime

When the action occurred.

details dict[str, Any]

Structured details about the action.

SocietyResult(status, termination, artifacts=dict(), trace=list(), rounds=0, total_llm_calls=0, run_id='', irreversible_actions=list()) dataclass

Result of running a society.

Attributes:

Name Type Description
status str

Summary status string.

termination TerminationReason

Why the run ended.

artifacts dict[str, str]

Final artifact state (name -> content snapshot).

trace list[Event]

Full event trace in order.

rounds int

Number of drain loop iterations.

total_llm_calls int

Number of LLM API calls made.

run_id str

Correlation ID for this society run (links to structured logs).

irreversible_actions list[IrreversibleAction]

External side effects that occurred.

RuntimeObserver

Bases: Protocol

Observer for runtime lifecycle events (e.g. dashboard integration).

Runtime

Bases: Protocol

Protocol for society execution engines.

LocalRuntime

local

LocalRuntime — single-process async execution engine for agent societies.

Implements the compile→seed→drain→return loop from the design doc §6.3. Agents execute in parallel when their events are independent (no shared edges, no overlapping artifact writes).

AgentOutput(events=list(), write_actions=list(), llm_response=None, llm_call_count=1) dataclass

Result of executing an agent for one event.

Attributes:

Name Type Description
events list[Event]

New events to push onto the EventBus.

write_actions list[WriteAction]

Artifact writes to apply.

llm_response LLMResponse | None

The raw LLM response (final response).

llm_call_count int

Number of LLM calls made (including ReAct rounds).

LocalRuntime(llm, observer=None, tool_executor=None, max_tool_rounds=10, circuit_breaker=None, watchdog=None, human_interfaces=None, comment_watchers=None)

Single-process async runtime for executing agent societies.

Implements the compile→seed→drain→return loop.

Parameters:

Name Type Description Default
llm LLM

LLM backend for agent execution.

required
observer RuntimeObserver | None

Optional lifecycle observer (e.g. for dashboard).

None
tool_executor ToolExecutor | None

Optional ToolExecutor for real tool execution in the ReAct loop. When None, tool calls are converted to events (backward-compatible behavior).

None
max_tool_rounds int

Maximum ReAct loop iterations per agent turn. Prevents infinite tool-calling loops. Defaults to 10.

10
circuit_breaker CircuitBreaker | None

Optional circuit breaker to trip after N consecutive LLM failures, preventing further calls from burning through retries on a degraded backend.

None
watchdog Watchdog | None

Optional watchdog to enforce per-agent-turn timeouts. When provided, each agent execution is wrapped with watchdog.execute_with_timeout(). If an agent exceeds the timeout, a timeout event is produced instead of the normal agent output.

None
human_interfaces dict[str, Any] | None

Mapping of agent name to HumanInterface implementation. Human agents (agent.human == True) are routed to their registered interface instead of the LLM. Raises ValueError at runtime if a human agent has no registered interface.

None
comment_watchers list[Any] | None

Optional list of watcher objects that monitor external sources (e.g. GitHub PR comments) and inject events into the running society via bus.inject(). Each watcher must have an async watch(bus) method. Watchers are started as background tasks after seeding and cancelled when the drain loop terminates.

None

run(society, task, artifacts=None) async

Execute a society to completion.

Parameters:

Name Type Description Default
society Society

The society graph to execute.

required
task str

Task description to seed the first event.

required
artifacts list[Any] | None

Optional list of ArtifactProtocol objects to pre-register in the artifact store.

None

Returns:

Type Description
SocietyResult

SocietyResult with trace and final artifact state.

check_termination(society, bus, trace, llm_calls, start_time, resolved_edges)

Check if the society run should terminate.

Parameters:

Name Type Description Default
society Society

The society graph.

required
bus EventBus

The event bus (check if empty).

required
trace list[Event]

Full event trace so far.

required
llm_calls int

Total LLM calls made.

required
start_time datetime

When the run started.

required
resolved_edges set[str]

Set of edge IDs that have been resolved.

required

Returns:

Type Description
TerminationReason | None

TerminationReason if the run should stop, None otherwise.

find_entry_agents(society)

Find agents that should receive initial task_assigned events.

Entry agents are those that are only targets of Delegation edges (workers) or have no incoming directed edges. For non-directed edges (Cooperation, Competition, Coopetition), all members are potential entry points.

In practice, the delegator (source) in the outermost Delegation edge is the entry point. If there are no Delegation edges, all agents are entry points.

parse_llm_output(response, agent, event, edge)

Parse an LLM response into events and artifact actions.

Tool calls are mapped to events or artifact writes: - emit_event(event_type, target_agent, data) → Event - write_artifact(name, content) → WriteAction - Any other tool call → Event with that tool name as event type

classify_tool_calls(tool_calls)

Classify tool calls into executable tools and claw actions.

Executable tools (file_edit, shell_exec, github) are dispatched to the ToolExecutor. Claw actions (emit_event, write_artifact) and unknown tools are parsed into events/writes.

Parameters:

Name Type Description Default
tool_calls list[ToolCall]

Tool calls from an LLM response.

required

Returns:

Type Description
tuple[list[ToolCall], list[ToolCall]]

Tuple of (executable_tool_calls, claw_action_calls).

EventBus

event_bus

EventBus — edge-routed event delivery with batch independence detection.

The EventBus queues events and pops independent batches for parallel execution. Two events are independent iff their target agents share no edges and no predicted artifact writes.

EventBus(society)

Async-compatible event queue with edge-routed delivery.

Maintains a FIFO queue of events and provides batch popping that respects agent independence constraints.

Parameters:

Name Type Description Default
society Society

The society graph, used for edge adjacency checks.

required

push(event)

Enqueue an event for delivery.

inject(event)

Inject an external event into the bus.

Unlike push(), which is called from inside the runtime drain loop, inject() is safe to call from any thread or coroutine (e.g. a GitHub comment watcher) while the society is running. Injected events are drained into the main queue at the start of the next pop_independent_batch() call.

Parameters:

Name Type Description Default
event Event

The event to inject.

required

empty()

Check if the queue has no pending events.

pop_independent_batch()

Pop a batch of events whose targets can execute in parallel.

Two events are independent iff their target agents: 1. Share no edges in the society graph, AND 2. Have no overlapping predicted artifact write sets.

Returns the maximal independent set (greedy, first-come priority). Returns an empty list if the queue is empty.

peek()

Return all pending events without removing them.

Artifact Store

artifact_store

ArtifactStore — registry and batch write manager for artifacts.

The store holds all live artifact instances during a society run. It provides name-based lookup, version snapshots, and concurrent-write conflict detection within batches.

Example::

from claw.artifact import StringArtifact
from claw.runtime.artifact_store import ArtifactStore, WriteAction

store = ArtifactStore()
doc = StringArtifact("design-doc")
store.register(doc)

store.begin_batch()
store.apply(WriteAction("design-doc", "Hello", "writer"))
store.end_batch()

WriteAction(artifact_name, content, agent) dataclass

A request to write content to a named artifact.

Parameters:

Name Type Description Default
artifact_name str

Target artifact identifier.

required
content str

Content to write (string for StringArtifact, JSON for JsonArtifact).

required
agent str

Name of the agent performing the write.

required

ArtifactConflictError(artifact_name, agents)

Bases: Exception

Raised when two agents write to the same artifact within one batch.

ArtifactStore()

Registry of live artifacts with batch conflict detection.

Usage::

store = ArtifactStore()
store.register(my_artifact)

store.begin_batch()
store.apply(WriteAction("name", "content", "agent-a"))
store.end_batch()

If two apply() calls target the same artifact within a single batch, :class:ArtifactConflictError is raised on the second write.

artifacts property

All registered artifacts (copy).

register(artifact)

Register an artifact. Raises ValueError if name already taken.

get(name)

Look up an artifact by name. Returns None if not found.

snapshot()

Return current version of every registered artifact.

begin_batch()

Start a new write batch. Clears conflict tracking.

apply(action)

Apply a write action, checking for conflicts within the batch.

Raises:

Type Description
KeyError

If the artifact is not registered.

ArtifactConflictError

If the same artifact was already written in this batch by a different agent.

end_batch()

End the current batch. Clears conflict tracking.

checkpoint()

Capture current artifact content as a restore point.

Returns:

Type Description
dict[str, str]

Dict mapping artifact name to its current content (via read()).

rollback(snapshot)

Restore artifacts to a previous state.

For each artifact in the snapshot, writes the stored content back with agent="system:rollback". Artifacts not in the snapshot are left unchanged.

Parameters:

Name Type Description Default
snapshot dict[str, str]

Dict from checkpoint().

required

Termination Tracking

termination

Termination — per-edge and global termination logic.

Tracks which edges have been resolved and determines when the society run should terminate.

EdgeResolutionTracker()

Track per-edge resolution state.

Each edge type has specific events that trigger resolution: - Oversight: approve or reject - Delegation: accept, reject, or complete - Cooperation: all agents signal complete - Competition: all competitors submit → judge resolves - Coopetition: agreement reached or max_rounds

resolved_edges property

Return set of resolved edge IDs.

process_event(event, edge)

Process an event and check if it resolves the edge.

Parameters:

Name Type Description Default
event Event

The event to process.

required
edge Edge | GroupEdge | None

The edge this event travels along.

required

Returns:

Type Description
bool

True if the edge was resolved by this event.

is_resolved(edge_id)

Check if a specific edge has been resolved.

Persistence

persistence

Event persistence — append-only event log for observability and replay.

Writes events to .claw/runs/{run_id}.jsonl as they occur. Supports loading a completed run's event stream for dashboard history replay.

EventLogger(run_id, base_dir='.claw/runs')

Append-only event logger writing to a JSONL file.

Parameters:

Name Type Description Default
run_id str

Unique identifier for this run.

required
base_dir Path | str

Directory where run logs are stored.

'.claw/runs'

log_path property

Path to the JSONL log file.

log(event)

Append an event to the log file.

log_artifact_snapshot(name, version, content)

Log an artifact state snapshot.

log_termination(reason)

Log the final termination status.

load_run(run_id, base_dir='.claw/runs')

Load a completed run's event stream from disk.

Parameters:

Name Type Description Default
run_id str

The run ID to load.

required
base_dir Path | str

Directory where run logs are stored.

'.claw/runs'

Returns:

Type Description
list[dict]

List of log entries (events, snapshots, termination).

Raises:

Type Description
FileNotFoundError

If the run log doesn't exist.

Watchdog

watchdog

Watchdog — monitors agent execution time and enforces timeouts.

The Watchdog tracks wall-clock time per active agent turn and cancels execution if timeouts are exceeded. It supports per-edge and global timeout policies.

TimeoutAction

Bases: StrEnum

What to do when a timeout occurs.

Watchdog(default_timeout=timedelta(seconds=120))

Monitors agent execution time and enforces timeouts.

Attributes:

Name Type Description
default_timeout

Global per-turn timeout for any agent.

timed_out_agents property

Return list of agent names that timed out.

start_turn(agent_name)

Record that an agent's turn has started.

end_turn(agent_name)

Record that an agent's turn has completed.

check_timeout(agent_name, edge_timeout=None)

Check if an agent's turn has exceeded its timeout.

Parameters:

Name Type Description Default
agent_name str

The agent to check.

required
edge_timeout timedelta | None

Per-edge timeout override. Falls back to the global default if None.

None

Returns:

Type Description
bool

True if the agent has timed out.

execute_with_timeout(coro, agent_name, edge_timeout=None) async

Execute a coroutine with a timeout.

Parameters:

Name Type Description Default
coro Any

The coroutine to execute.

required
agent_name str

Name of the agent being executed.

required
edge_timeout timedelta | None

Per-edge timeout override.

None

Returns:

Type Description
object

Tuple of (result, timed_out). If timed_out is True,

bool

result is None.