Skip to main content
Axon is available through the object-oriented SDK surface in both TypeScript and Python. The examples on this page use RunloopSDK in TypeScript and AsyncRunloopSDK in Python.

Overview

Axon is Runloop’s real-time, distributed event bus for agent workflows. It stores interaction events in an append-only stream, assigns each event a monotonic sequence number, and exposes that stream to publishers, subscribers, and brokers.

Shared Journal

Coordinates work across users, agents, and external systems via an ordered event stream.

SQL Database

Embedded SQLite for structured state such as configuration, task queues, or key-value pairs.

Broker

Bridges Axons to agents running in devboxes, forwarding events one turn at a time.
Each event carries:
  • A source identifying where it came from (e.g. github, slack, my-agent)
  • An event type describing what happened (e.g. push, task_complete, review_requested)
  • An origin classifying who produced it (EXTERNAL_EVENT, AGENT_EVENT, USER_EVENT)
  • A payload containing the event data as a JSON string
  • A monotonic sequence number for ordering
Quick example:
import json
from runloop_api_client import AsyncRunloopSDK

runloop = AsyncRunloopSDK()
axon = await runloop.axon.create(name="assistant-session")

await axon.publish(
    source="dashboard",
    event_type="user.message",
    origin="USER_EVENT",
    payload=json.dumps({"content": "Summarize the latest PR feedback."}),
)

async with await axon.subscribe_sse() as stream:
    async for event in stream:
        print(event.sequence, event.event_type, event.payload)

How It Works

  1. Create an Axon — each Axon is a named event stream scoped to your account.
  2. Publish input events — users, orchestrators, webhooks, and other systems append structured events to the stream.
  3. Bridge through Broker — Broker reads incoming events and forwards them to the agent one turn at a time.
  4. Record agent output — broker-emitted events such as turn.message_chunk and turn.completed are appended back to the same Axon.
  5. Subscribe via SSE — clients observe the stream in order using sequence numbers.

Quick Start

Step 1: Create an Axon

from runloop_api_client import AsyncRunloopSDK

runloop = AsyncRunloopSDK()
axon = await runloop.axon.create(name="my-channel")
print(f"Created axon: {axon.id}")
The name parameter is optional. If omitted, an unnamed axon is created.

Step 2: Publish Events

import json

result = await axon.publish(
    source="app",
    event_type="user.message",
    origin="USER_EVENT",
    payload=json.dumps({
        "content": "Review the latest failing build and suggest a fix.",
    }),
)

print(f"Published with sequence: {result.sequence}")
Each publish call returns a PublishResultView with the assigned sequence number and timestamp_ms. Sequence numbers are monotonically increasing, so you can use them to track ordering.

Step 3: Subscribe to Events

async with await axon.subscribe_sse() as stream:
    async for event in stream:
        print(f"Seq {event.sequence}: [{event.source}] {event.event_type}")
        print(f"  Origin: {event.origin}")
        print(f"  Payload: {event.payload}")
The SSE stream delivers AxonEventView objects in sequence order. The stream stays open until you break out of the loop or the connection is closed.

Event Structure

Each event delivered via SSE includes the full event metadata stored in the Axon stream:
FieldTypeDescription
sequencenumberMonotonically increasing sequence number
axon_idstringThe axon this event belongs to
timestamp_msnumberTimestamp in milliseconds since epoch
originstringEvent origin classification (see below)
sourcestringEvent source identifier (e.g. github, slack, my-agent)
event_typestringEvent type identifier (e.g. push, task_complete)
payloadstringJSON-encoded event payload

Event Origins

Origins classify who produced the event. When publishing, you can use three origin types:
OriginDescriptionExample
EXTERNAL_EVENTEvents from external systems (webhooks, CI, third-party services)GitHub push, Slack message
AGENT_EVENTEvents produced on behalf of an agentBroker-published agent output such as turn.message_chunk
USER_EVENTEvents produced by a human user or user-facing applicationuser.message, manual approval
When subscribing, you may also receive:
OriginDescription
SYSTEM_EVENTEvents generated by the Runloop platform or broker lifecycle
SYSTEM_EVENT is a receive-only origin. You cannot publish events with this origin. In brokered flows, events such as turn.started, turn.completed, turn.cancelled, and turn.failed are typically emitted as system events.

Use Cases

User-To-Agent Turns

Publish user input into an Axon, then subscribe for broker-published turn output from the attached agent.
import json
from runloop_api_client import AsyncRunloopSDK

runloop = AsyncRunloopSDK()
axon = await runloop.axon.create(name="assistant-turns")
stream = await axon.subscribe_sse()

await axon.publish(
    source="dashboard",
    event_type="user.message",
    origin="USER_EVENT",
    payload=json.dumps({
        "content": "Summarize the latest CI failures and group them by root cause.",
    }),
)

async with stream:
    async for event in stream:
        if event.event_type == "turn.message_chunk":
            chunk = json.loads(event.payload)
            print(chunk["content"], end="")

        if event.event_type == "turn.completed":
            print("\nTurn complete")
            break

Webhook And Automation Fan-In

Use a single Axon as the shared journal for events coming from external systems, then let downstream subscribers and brokers react in order.
import json
from runloop_api_client import AsyncRunloopSDK

runloop = AsyncRunloopSDK()
axon = await runloop.axon.create(name="repo-automation")

await axon.publish(
    source="github",
    event_type="push",
    origin="EXTERNAL_EVENT",
    payload=json.dumps({"ref": "refs/heads/main", "repository": "runloop"}),
)

await axon.publish(
    source="slack",
    event_type="incident.created",
    origin="EXTERNAL_EVENT",
    payload=json.dumps({"channel": "#alerts", "severity": "high"}),
)

async with await axon.subscribe_sse() as stream:
    async for event in stream:
        print(f"{event.sequence}: {event.source} -> {event.event_type}")

Reconnecting to an Existing Axon

If you already have an axon ID (e.g. stored from a previous session), you can reconnect without creating a new one.
from datetime import datetime
from runloop_api_client import AsyncRunloopSDK

runloop = AsyncRunloopSDK()
axon = runloop.axon.from_id("axn_abc123")

info = await axon.get_info()
print(f"Axon: {info.name}, created: {datetime.fromtimestamp(info.created_at_ms / 1000)}")

async with await axon.subscribe_sse() as stream:
    async for event in stream:
        print(event)

Managing Axons

List Active Axons

from runloop_api_client import AsyncRunloopSDK

runloop = AsyncRunloopSDK()
axons = await runloop.axon.list()
for axon in axons:
    info = await axon.get_info()
    print(f"{info.id}: {info.name or '(unnamed)'}")

Retrieve an Axon

from datetime import datetime
from runloop_api_client import AsyncRunloopSDK

runloop = AsyncRunloopSDK()
axon = runloop.axon.from_id("axn_abc123")
info = await axon.get_info()
print(f"Name: {info.name}")
print(f"Created: {datetime.fromtimestamp(info.created_at_ms / 1000)}")

Limitations, Nuances, and Constraints

Axons are a powerful tool for building real-time, distributed agent workflows. However, there are some limitations and constraints to be aware of:
  • Axon events are immutable. Once an event is published, it cannot be modified or deleted.
  • Axons are currently limited to 10GB of event data.
  • SQL Database — Embedded SQLite for structured state within an Axon
  • Broker — Learn how Runloop bridges Axons to agents running in devboxes
  • Devbox Overview — Learn about Runloop’s sandbox environments
  • Tunnels — Expose services running in a devbox
  • Agent Gateways — Securely proxy API requests
  • SDKs — TypeScript and Python SDK installation