[ Switch to styled version → ]
Subscribe to topics, publish events, and stream data in real time. Daemons run an event stream broker for agents to subscribe to topics on trusted peers and receive events.
Every daemon runs an event stream broker on port 1002. Agents can subscribe to topics on any trusted peer and receive events in real time. Publishers send events to a topic, and the broker distributes them to all active subscribers.
Pub/sub is for fan-out scenarios where multiple consumers need the same data stream. For one-to-one messaging, use stream connections or data exchange.
Each daemon runs its own independent broker. The broker lives inside the daemon process and manages subscriptions for that node only.
When an agent subscribes to topics on another agent, its daemon opens a connection to their event stream port (1002). The remote broker registers the subscription and pushes matching events over that connection. When an agent publishes to another agent, its daemon sends the event to their broker, which fans it out to all active subscribers.
A bounded subscription collects a fixed number of events and returns a JSON array.
pilotctl subscribe other-agent status --count 5 --timeout 60s This returns an object containing `events` (an array of objects with `topic`, `data`, and `bytes`) and `timeout` (a boolean).
An unbounded subscription streams events indefinitely as NDJSON (one JSON object per line).
pilotctl subscribe other-agent status Each line is a standalone JSON object, for example: {"topic":"status","data":"online","bytes":6}
pilotctl publish other-agent status --data "processing complete"
pilotctl publish other-agent metrics --data '{"cpu":42,"mem":1024}' Events are delivered to all active subscribers of the topic on the target node. The command returns the `target`, `topic`, and `bytes`.
Use `*` as the topic to subscribe to all topics at once.
pilotctl subscribe other-agent "*" --count 10 `*` is a full wildcard that matches every topic published on the target's event stream broker. It is not a prefix glob, so `events.*` is not valid syntax. `*` is the only wildcard form and it always matches all topics.
Without `--count`, subscriptions stream NDJSON indefinitely. This can be integrated with tools that process line-delimited JSON.
# Pipe events to jq for processing
pilotctl subscribe other-agent status | jq '.data'
# Log events to a file
pilotctl subscribe other-agent "*" >> events.jsonl
# Monitor metrics in real time
pilotctl subscribe other-agent metrics | while read -r line; do
echo "$line" | jq -r '"CPU: \(.data | fromjson | .cpu)%"'
done Pub/sub is for real-time streaming where dropping an occasional event is acceptable. For guaranteed delivery, use data exchange or stream connections.
Topic names are arbitrary strings. Using dot-separated namespaces is a convention for organization.
Topic names should be short and descriptive. Since `*` is the only wildcard, prefix-based filtering is not supported. Subscribe to a specific topic or use `*` and filter on the client side.
The event stream uses a simple wire protocol:
The broker is an in-memory fan-out system with no queues, disk I/O, or acknowledgments.
For real-time monitoring, an agent publishes system metrics. A dashboard agent subscribes to render them.
# On the monitored agent (via SDK)
pilotctl publish self metrics --data '{"cpu":42,"mem":1024,"disk":80}'
# On the dashboard agent
pilotctl subscribe monitored-agent metrics >> dashboard-data.jsonl For coordination, a controller publishes tasks, and workers subscribe to pick them up.
# Workers subscribe
pilotctl subscribe controller-agent tasks --count 1
# Controller publishes work
pilotctl publish controller-agent tasks --data '{"job":"process-batch-42"}' For event-driven workflows, actions are triggered in response to events from other agents.
# React to completion events
pilotctl subscribe pipeline-agent task.completed | while read -r event; do
echo "Task done, starting next stage..."
done