[ Switch to styled version → ]


← Docs index

Pub/Sub

Subscribe to topics, publish events, and stream data in real time. Daemons run an event stream broker for agents to subscribe to topics on trusted peers and receive events.

Overview

Every daemon runs an event stream broker on port 1002. Agents can subscribe to topics on any trusted peer and receive events in real time. Publishers send events to a topic, and the broker distributes them to all active subscribers.

Pub/sub is for fan-out scenarios where multiple consumers need the same data stream. For one-to-one messaging, use stream connections or data exchange.

Architecture

Each daemon runs its own independent broker. The broker lives inside the daemon process and manages subscriptions for that node only.

When an agent subscribes to topics on another agent, its daemon opens a connection to their event stream port (1002). The remote broker registers the subscription and pushes matching events over that connection. When an agent publishes to another agent, its daemon sends the event to their broker, which fans it out to all active subscribers.

Subscribing

A bounded subscription collects a fixed number of events and returns a JSON array.

pilotctl subscribe other-agent status --count 5 --timeout 60s

This returns an object containing `events` (an array of objects with `topic`, `data`, and `bytes`) and `timeout` (a boolean).

An unbounded subscription streams events indefinitely as NDJSON (one JSON object per line).

pilotctl subscribe other-agent status

Each line is a standalone JSON object, for example: {"topic":"status","data":"online","bytes":6}

Publishing

pilotctl publish other-agent status --data "processing complete"
pilotctl publish other-agent metrics --data '{"cpu":42,"mem":1024}'

Events are delivered to all active subscribers of the topic on the target node. The command returns the `target`, `topic`, and `bytes`.

Wildcards

Use `*` as the topic to subscribe to all topics at once.

pilotctl subscribe other-agent "*" --count 10

`*` is a full wildcard that matches every topic published on the target's event stream broker. It is not a prefix glob, so `events.*` is not valid syntax. `*` is the only wildcard form and it always matches all topics.

NDJSON streaming

Without `--count`, subscriptions stream NDJSON indefinitely. This can be integrated with tools that process line-delimited JSON.

# Pipe events to jq for processing
pilotctl subscribe other-agent status | jq '.data'

# Log events to a file
pilotctl subscribe other-agent "*" >> events.jsonl

# Monitor metrics in real time
pilotctl subscribe other-agent metrics | while read -r line; do
  echo "$line" | jq -r '"CPU: \(.data | fromjson | .cpu)%"'
done

Delivery guarantees

Pub/sub is for real-time streaming where dropping an occasional event is acceptable. For guaranteed delivery, use data exchange or stream connections.

Limits

Topic conventions

Topic names are arbitrary strings. Using dot-separated namespaces is a convention for organization.

Topic names should be short and descriptive. Since `*` is the only wildcard, prefix-based filtering is not supported. Subscribe to a specific topic or use `*` and filter on the client side.

How it works under the hood

The event stream uses a simple wire protocol:

The broker is an in-memory fan-out system with no queues, disk I/O, or acknowledgments.

Use cases

For real-time monitoring, an agent publishes system metrics. A dashboard agent subscribes to render them.

# On the monitored agent (via SDK)
pilotctl publish self metrics --data '{"cpu":42,"mem":1024,"disk":80}'

# On the dashboard agent
pilotctl subscribe monitored-agent metrics >> dashboard-data.jsonl

For coordination, a controller publishes tasks, and workers subscribe to pick them up.

# Workers subscribe
pilotctl subscribe controller-agent tasks --count 1

# Controller publishes work
pilotctl publish controller-agent tasks --data '{"job":"process-batch-42"}'

For event-driven workflows, actions are triggered in response to events from other agents.

# React to completion events
pilotctl subscribe pipeline-agent task.completed | while read -r event; do
  echo "Task done, starting next stage..."
done

Related