Runtime¶
Core Runtime¶
base
¶
Runtime protocols and result types.
Defines the Runtime protocol, SocietyResult, TerminationReason, RuntimeObserver, IrreversibleAction, and a minimal LLM protocol that Phase 5 will expand.
TerminationReason
¶
Bases: StrEnum
Why a society run terminated.
IrreversibleAction(type, agent, timestamp=datetime.now(), details=dict())
dataclass
¶
Record of an external side effect that cannot be rolled back.
Attributes:
| Name | Type | Description |
|---|---|---|
type |
str
|
Category (e.g. "github_comment_posted", "commit_pushed"). |
agent |
str
|
Name of the agent that performed the action. |
timestamp |
datetime
|
When the action occurred. |
details |
dict[str, Any]
|
Structured details about the action. |
SocietyResult(status, termination, artifacts=dict(), trace=list(), rounds=0, total_llm_calls=0, run_id='', irreversible_actions=list())
dataclass
¶
Result of running a society.
Attributes:
| Name | Type | Description |
|---|---|---|
status |
str
|
Summary status string. |
termination |
TerminationReason
|
Why the run ended. |
artifacts |
dict[str, str]
|
Final artifact state (name -> content snapshot). |
trace |
list[Event]
|
Full event trace in order. |
rounds |
int
|
Number of drain loop iterations. |
total_llm_calls |
int
|
Number of LLM API calls made. |
run_id |
str
|
Correlation ID for this society run (links to structured logs). |
irreversible_actions |
list[IrreversibleAction]
|
External side effects that occurred. |
RuntimeObserver
¶
Bases: Protocol
Observer for runtime lifecycle events (e.g. dashboard integration).
Runtime
¶
Bases: Protocol
Protocol for society execution engines.
LocalRuntime¶
local
¶
LocalRuntime — single-process async execution engine for agent societies.
Implements the compile→seed→drain→return loop from the design doc §6.3. Agents execute in parallel when their events are independent (no shared edges, no overlapping artifact writes).
AgentOutput(events=list(), write_actions=list(), llm_response=None, llm_call_count=1)
dataclass
¶
Result of executing an agent for one event.
Attributes:
| Name | Type | Description |
|---|---|---|
events |
list[Event]
|
New events to push onto the EventBus. |
write_actions |
list[WriteAction]
|
Artifact writes to apply. |
llm_response |
LLMResponse | None
|
The raw LLM response (final response). |
llm_call_count |
int
|
Number of LLM calls made (including ReAct rounds). |
LocalRuntime(llm, observer=None, tool_executor=None, max_tool_rounds=10, circuit_breaker=None, watchdog=None, human_interfaces=None, comment_watchers=None)
¶
Single-process async runtime for executing agent societies.
Implements the compile→seed→drain→return loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
llm
|
LLM
|
LLM backend for agent execution. |
required |
observer
|
RuntimeObserver | None
|
Optional lifecycle observer (e.g. for dashboard). |
None
|
tool_executor
|
ToolExecutor | None
|
Optional ToolExecutor for real tool execution in the ReAct loop. When None, tool calls are converted to events (backward-compatible behavior). |
None
|
max_tool_rounds
|
int
|
Maximum ReAct loop iterations per agent turn. Prevents infinite tool-calling loops. Defaults to 10. |
10
|
circuit_breaker
|
CircuitBreaker | None
|
Optional circuit breaker to trip after N consecutive LLM failures, preventing further calls from burning through retries on a degraded backend. |
None
|
watchdog
|
Watchdog | None
|
Optional watchdog to enforce per-agent-turn timeouts.
When provided, each agent execution is wrapped with
|
None
|
human_interfaces
|
dict[str, Any] | None
|
Mapping of agent name to HumanInterface
implementation. Human agents ( |
None
|
comment_watchers
|
list[Any] | None
|
Optional list of watcher objects that monitor
external sources (e.g. GitHub PR comments) and inject
events into the running society via |
None
|
run(society, task, artifacts=None)
async
¶
Execute a society to completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
society
|
Society
|
The society graph to execute. |
required |
task
|
str
|
Task description to seed the first event. |
required |
artifacts
|
list[Any] | None
|
Optional list of ArtifactProtocol objects to pre-register in the artifact store. |
None
|
Returns:
| Type | Description |
|---|---|
SocietyResult
|
SocietyResult with trace and final artifact state. |
check_termination(society, bus, trace, llm_calls, start_time, resolved_edges)
¶
Check if the society run should terminate.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
society
|
Society
|
The society graph. |
required |
bus
|
EventBus
|
The event bus (check if empty). |
required |
trace
|
list[Event]
|
Full event trace so far. |
required |
llm_calls
|
int
|
Total LLM calls made. |
required |
start_time
|
datetime
|
When the run started. |
required |
resolved_edges
|
set[str]
|
Set of edge IDs that have been resolved. |
required |
Returns:
| Type | Description |
|---|---|
TerminationReason | None
|
TerminationReason if the run should stop, None otherwise. |
find_entry_agents(society)
¶
Find agents that should receive initial task_assigned events.
Entry agents are those that are only targets of Delegation edges (workers) or have no incoming directed edges. For non-directed edges (Cooperation, Competition, Coopetition), all members are potential entry points.
In practice, the delegator (source) in the outermost Delegation edge is the entry point. If there are no Delegation edges, all agents are entry points.
parse_llm_output(response, agent, event, edge)
¶
Parse an LLM response into events and artifact actions.
Tool calls are mapped to events or artifact writes: - emit_event(event_type, target_agent, data) → Event - write_artifact(name, content) → WriteAction - Any other tool call → Event with that tool name as event type
classify_tool_calls(tool_calls)
¶
Classify tool calls into executable tools and claw actions.
Executable tools (file_edit, shell_exec, github) are dispatched to the ToolExecutor. Claw actions (emit_event, write_artifact) and unknown tools are parsed into events/writes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_calls
|
list[ToolCall]
|
Tool calls from an LLM response. |
required |
Returns:
| Type | Description |
|---|---|
tuple[list[ToolCall], list[ToolCall]]
|
Tuple of (executable_tool_calls, claw_action_calls). |
EventBus¶
event_bus
¶
EventBus — edge-routed event delivery with batch independence detection.
The EventBus queues events and pops independent batches for parallel execution. Two events are independent iff their target agents share no edges and no predicted artifact writes.
EventBus(society)
¶
Async-compatible event queue with edge-routed delivery.
Maintains a FIFO queue of events and provides batch popping that respects agent independence constraints.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
society
|
Society
|
The society graph, used for edge adjacency checks. |
required |
push(event)
¶
Enqueue an event for delivery.
inject(event)
¶
Inject an external event into the bus.
Unlike push(), which is called from inside the runtime drain
loop, inject() is safe to call from any thread or coroutine
(e.g. a GitHub comment watcher) while the society is running.
Injected events are drained into the main queue at the start of
the next pop_independent_batch() call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
Event
|
The event to inject. |
required |
empty()
¶
Check if the queue has no pending events.
pop_independent_batch()
¶
Pop a batch of events whose targets can execute in parallel.
Two events are independent iff their target agents: 1. Share no edges in the society graph, AND 2. Have no overlapping predicted artifact write sets.
Returns the maximal independent set (greedy, first-come priority). Returns an empty list if the queue is empty.
peek()
¶
Return all pending events without removing them.
Artifact Store¶
artifact_store
¶
ArtifactStore — registry and batch write manager for artifacts.
The store holds all live artifact instances during a society run. It provides name-based lookup, version snapshots, and concurrent-write conflict detection within batches.
Example::
from claw.artifact import StringArtifact
from claw.runtime.artifact_store import ArtifactStore, WriteAction
store = ArtifactStore()
doc = StringArtifact("design-doc")
store.register(doc)
store.begin_batch()
store.apply(WriteAction("design-doc", "Hello", "writer"))
store.end_batch()
WriteAction(artifact_name, content, agent)
dataclass
¶
A request to write content to a named artifact.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact_name
|
str
|
Target artifact identifier. |
required |
content
|
str
|
Content to write (string for StringArtifact, JSON for JsonArtifact). |
required |
agent
|
str
|
Name of the agent performing the write. |
required |
ArtifactConflictError(artifact_name, agents)
¶
Bases: Exception
Raised when two agents write to the same artifact within one batch.
ArtifactStore()
¶
Registry of live artifacts with batch conflict detection.
Usage::
store = ArtifactStore()
store.register(my_artifact)
store.begin_batch()
store.apply(WriteAction("name", "content", "agent-a"))
store.end_batch()
If two apply() calls target the same artifact within a single
batch, :class:ArtifactConflictError is raised on the second write.
artifacts
property
¶
All registered artifacts (copy).
register(artifact)
¶
Register an artifact. Raises ValueError if name already taken.
get(name)
¶
Look up an artifact by name. Returns None if not found.
snapshot()
¶
Return current version of every registered artifact.
begin_batch()
¶
Start a new write batch. Clears conflict tracking.
apply(action)
¶
Apply a write action, checking for conflicts within the batch.
Raises:
| Type | Description |
|---|---|
KeyError
|
If the artifact is not registered. |
ArtifactConflictError
|
If the same artifact was already written in this batch by a different agent. |
end_batch()
¶
End the current batch. Clears conflict tracking.
checkpoint()
¶
Capture current artifact content as a restore point.
Returns:
| Type | Description |
|---|---|
dict[str, str]
|
Dict mapping artifact name to its current content (via read()). |
rollback(snapshot)
¶
Restore artifacts to a previous state.
For each artifact in the snapshot, writes the stored content back with agent="system:rollback". Artifacts not in the snapshot are left unchanged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
snapshot
|
dict[str, str]
|
Dict from checkpoint(). |
required |
Termination Tracking¶
termination
¶
Termination — per-edge and global termination logic.
Tracks which edges have been resolved and determines when the society run should terminate.
EdgeResolutionTracker()
¶
Track per-edge resolution state.
Each edge type has specific events that trigger resolution: - Oversight: approve or reject - Delegation: accept, reject, or complete - Cooperation: all agents signal complete - Competition: all competitors submit → judge resolves - Coopetition: agreement reached or max_rounds
Persistence¶
persistence
¶
Event persistence — append-only event log for observability and replay.
Writes events to .claw/runs/{run_id}.jsonl as they occur. Supports
loading a completed run's event stream for dashboard history replay.
EventLogger(run_id, base_dir='.claw/runs')
¶
Append-only event logger writing to a JSONL file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
Unique identifier for this run. |
required |
base_dir
|
Path | str
|
Directory where run logs are stored. |
'.claw/runs'
|
load_run(run_id, base_dir='.claw/runs')
¶
Load a completed run's event stream from disk.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
run_id
|
str
|
The run ID to load. |
required |
base_dir
|
Path | str
|
Directory where run logs are stored. |
'.claw/runs'
|
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of log entries (events, snapshots, termination). |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If the run log doesn't exist. |
Watchdog¶
watchdog
¶
Watchdog — monitors agent execution time and enforces timeouts.
The Watchdog tracks wall-clock time per active agent turn and cancels execution if timeouts are exceeded. It supports per-edge and global timeout policies.
TimeoutAction
¶
Bases: StrEnum
What to do when a timeout occurs.
Watchdog(default_timeout=timedelta(seconds=120))
¶
Monitors agent execution time and enforces timeouts.
Attributes:
| Name | Type | Description |
|---|---|---|
default_timeout |
Global per-turn timeout for any agent. |
timed_out_agents
property
¶
Return list of agent names that timed out.
start_turn(agent_name)
¶
Record that an agent's turn has started.
end_turn(agent_name)
¶
Record that an agent's turn has completed.
check_timeout(agent_name, edge_timeout=None)
¶
Check if an agent's turn has exceeded its timeout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_name
|
str
|
The agent to check. |
required |
edge_timeout
|
timedelta | None
|
Per-edge timeout override. Falls back to the global default if None. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the agent has timed out. |
execute_with_timeout(coro, agent_name, edge_timeout=None)
async
¶
Execute a coroutine with a timeout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
coro
|
Any
|
The coroutine to execute. |
required |
agent_name
|
str
|
Name of the agent being executed. |
required |
edge_timeout
|
timedelta | None
|
Per-edge timeout override. |
None
|
Returns:
| Type | Description |
|---|---|
object
|
Tuple of (result, timed_out). If timed_out is True, |
bool
|
result is None. |