Replace Your Agent Message Broker with 12 Lines of Go
You have three agents. Agent A monitors CPU. Agent B monitors memory. Agent C needs to aggregate both streams and trigger alerts. The conventional answer is Kafka, RabbitMQ, or Redis Streams. That means another service to deploy, configure, secure, and keep running. For fewer than 1,000 agents, that is infrastructure overkill.
Pilot Protocol has a built-in event stream on port 1002. It supports topic-based routing, wildcard subscriptions, and persistent connections. The publish side is 6 lines of Go. The subscribe side is 6 lines. Twelve lines total, and you have a working agent pub/sub system with encrypted transport and NAT traversal included.
The Problem with Brokers for Agents
Message brokers are built for microservices running in the same data center. They assume:
- All producers and consumers can reach the broker over TCP
- The broker runs on infrastructure you control
- You can manage topics, partitions, and consumer groups
- You need persistence, replay, and exactly-once delivery
Agent networks break every assumption. Agents sit behind NAT. They run on laptops, VMs, and edge devices. They join and leave dynamically. And most agent communication is fire-and-forget: metrics, status updates, task completion signals. You do not need Kafka's durability guarantees for a CPU utilization event that is stale in 10 seconds.
What you need is topic routing between agents that can find each other, over connections that work through NAT, with encryption that does not require certificate management. That is what port 1002 provides.
The 12-Line Solution
Here is the publisher. Six lines of application logic (excluding imports and error handling).
package main
import (
"github.com/TeoSlayer/pilotprotocol/pkg/driver"
"encoding/json"
"time"
)
func main() {
d, _ := driver.Connect() // Connect to local daemon
stream, _ := d.OpenEventStream() // Open port 1002
for {
event := map[string]any{
"topic": "metrics.cpu",
"value": getCPUPercent(),
"ts": time.Now().Unix(),
}
data, _ := json.Marshal(event)
stream.Publish("metrics.cpu", data) // Publish to topic
time.Sleep(5 * time.Second)
}
}
And the subscriber. Six lines.
package main
import (
"github.com/TeoSlayer/pilotprotocol/pkg/driver"
"fmt"
)
func main() {
d, _ := driver.Connect() // Connect to local daemon
stream, _ := d.OpenEventStream() // Open port 1002
ch, _ := stream.Subscribe("metrics.*") // Wildcard subscription
for event := range ch {
fmt.Printf("[%s] %s\n", event.Topic, string(event.Data))
}
}
That is the complete pub/sub system. The driver.Connect() call connects to the local Pilot daemon via IPC socket. OpenEventStream() opens the event stream on port 1002. Publish sends a message to a topic. Subscribe returns a channel that delivers matching events.
What you get for free: Encryption (AES-256-GCM tunnel), NAT traversal (STUN + hole-punching), peer discovery (registry lookup), and access control (only trusted peers can subscribe). No broker configuration, no TLS certificates, no firewall rules.
Build a Monitoring Pipeline
Let us build something concrete. Three agents forming a monitoring pipeline:
- Agent A (us-east): publishes CPU metrics every 5 seconds to
metrics.cpu - Agent B (eu-west): publishes memory metrics every 5 seconds to
metrics.memory - Agent C (us-central): subscribes to
metrics.*, aggregates, and alerts
Agent A: CPU Publisher
package main
import (
"encoding/json"
"os/exec"
"strconv"
"strings"
"time"
"github.com/TeoSlayer/pilotprotocol/pkg/driver"
)
type CPUMetric struct {
Topic string `json:"topic"`
Host string `json:"host"`
Percent float64 `json:"percent"`
Ts int64 `json:"ts"`
}
func main() {
d, err := driver.Connect()
if err != nil {
panic(err)
}
stream, err := d.OpenEventStream()
if err != nil {
panic(err)
}
hostname, _ := d.Hostname()
for {
pct := readCPU()
metric := CPUMetric{
Topic: "metrics.cpu",
Host: hostname,
Percent: pct,
Ts: time.Now().Unix(),
}
data, _ := json.Marshal(metric)
stream.Publish("metrics.cpu", data)
time.Sleep(5 * time.Second)
}
}
func readCPU() float64 {
// Read from /proc/stat or use runtime on macOS
out, _ := exec.Command("sh", "-c",
"top -bn1 | grep 'Cpu(s)' | awk '{print $2}'").Output()
val, _ := strconv.ParseFloat(strings.TrimSpace(string(out)), 64)
return val
}
Agent B: Memory Publisher
package main
import (
"encoding/json"
"runtime"
"time"
"github.com/TeoSlayer/pilotprotocol/pkg/driver"
)
type MemMetric struct {
Topic string `json:"topic"`
Host string `json:"host"`
AllocMB uint64 `json:"alloc_mb"`
TotalMB uint64 `json:"total_mb"`
Ts int64 `json:"ts"`
}
func main() {
d, err := driver.Connect()
if err != nil {
panic(err)
}
stream, err := d.OpenEventStream()
if err != nil {
panic(err)
}
hostname, _ := d.Hostname()
for {
var m runtime.MemStats
runtime.ReadMemStats(&m)
metric := MemMetric{
Topic: "metrics.memory",
Host: hostname,
AllocMB: m.Alloc / 1024 / 1024,
TotalMB: m.TotalAlloc / 1024 / 1024,
Ts: time.Now().Unix(),
}
data, _ := json.Marshal(metric)
stream.Publish("metrics.memory", data)
time.Sleep(5 * time.Second)
}
}
Agent C: Aggregator and Alerter
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/TeoSlayer/pilotprotocol/pkg/driver"
)
type Alert struct {
Level string `json:"level"`
Message string `json:"message"`
Source string `json:"source"`
Ts int64 `json:"ts"`
}
func main() {
d, err := driver.Connect()
if err != nil {
panic(err)
}
stream, err := d.OpenEventStream()
if err != nil {
panic(err)
}
// Subscribe to ALL metrics with wildcard
ch, err := stream.Subscribe("metrics.*")
if err != nil {
panic(err)
}
fmt.Println("Aggregator listening on metrics.*")
for event := range ch {
switch event.Topic {
case "metrics.cpu":
var m struct {
Host string `json:"host"`
Percent float64 `json:"percent"`
}
json.Unmarshal(event.Data, &m)
fmt.Printf("[CPU] %s: %.1f%%\n", m.Host, m.Percent)
if m.Percent > 90.0 {
alert := Alert{
Level: "critical",
Message: fmt.Sprintf("CPU at %.1f%% on %s", m.Percent, m.Host),
Source: m.Host,
Ts: time.Now().Unix(),
}
data, _ := json.Marshal(alert)
stream.Publish("alerts.cpu", data)
fmt.Printf("[ALERT] %s\n", alert.Message)
}
case "metrics.memory":
var m struct {
Host string `json:"host"`
AllocMB uint64 `json:"alloc_mb"`
}
json.Unmarshal(event.Data, &m)
fmt.Printf("[MEM] %s: %d MB\n", m.Host, m.AllocMB)
}
}
}
Agent C subscribes to metrics.*, which matches both metrics.cpu and metrics.memory. When CPU exceeds 90%, it publishes an alert to alerts.cpu. Another agent could subscribe to alerts.* to trigger remediation -- scale down workload, notify humans, or restart the offending process.
Build a Workflow Pipeline
Event streams are not just for metrics. You can build multi-stage workflows where each agent watches for a completion event and starts the next stage.
The Pipeline
- Agent 1 (Ingestion): Receives raw data, normalizes it, publishes
pipeline.stage1.complete - Agent 2 (Analysis): Subscribes to
pipeline.stage1.complete, runs analysis, publishespipeline.stage2.complete - Agent 3 (Summary): Subscribes to
pipeline.stage2.complete, generates summary with LLM, publishespipeline.stage3.complete - Agent 4 (Output): Subscribes to
pipeline.stage3.complete, writes final report
Here is Agent 2 as an example. The others follow the same pattern.
package main
import (
"encoding/json"
"fmt"
"github.com/TeoSlayer/pilotprotocol/pkg/driver"
)
type StageEvent struct {
Stage int `json:"stage"`
JobID string `json:"job_id"`
Payload json.RawMessage `json:"payload"`
}
func main() {
d, _ := driver.Connect()
stream, _ := d.OpenEventStream()
// Wait for stage 1 to complete
ch, _ := stream.Subscribe("pipeline.stage1.complete")
fmt.Println("Agent 2 (Analysis) waiting for stage 1...")
for event := range ch {
var prev StageEvent
json.Unmarshal(event.Data, &prev)
fmt.Printf("Received job %s from stage 1\n", prev.JobID)
// Do analysis work
result := analyze(prev.Payload)
// Signal next stage
next := StageEvent{
Stage: 2,
JobID: prev.JobID,
Payload: result,
}
data, _ := json.Marshal(next)
stream.Publish("pipeline.stage2.complete", data)
fmt.Printf("Job %s: stage 2 complete, published to stage 3\n", prev.JobID)
}
}
func analyze(raw json.RawMessage) json.RawMessage {
// Your analysis logic here
result := map[string]any{
"analyzed": true,
"findings": []string{"pattern_a detected", "anomaly in sector 7"},
}
data, _ := json.Marshal(result)
return data
}
Each agent is independent. Agent 2 does not know about Agent 3. It just publishes a completion event. If Agent 3 is down, the event is lost (this is fire-and-forget pub/sub, not a durable queue). When Agent 3 comes back up and resubscribes, it picks up new events. For many agent workflows -- especially LLM pipelines where results are non-deterministic anyway -- this is fine.
Branching Pipelines
The wildcard subscription makes branching trivial. Suppose stage 2 can produce two kinds of output: normal results and anomalies. Publish to different sub-topics:
// Normal result
stream.Publish("pipeline.stage2.result.normal", data)
// Anomaly detected
stream.Publish("pipeline.stage2.result.anomaly", data)
An agent that handles all results subscribes to pipeline.stage2.result.*. An agent that only handles anomalies subscribes to pipeline.stage2.result.anomaly. The topic hierarchy is the routing logic. No broker configuration needed.
CLI Equivalent for Quick Testing
You do not need to write Go to test the event stream. The pilotctl CLI has publish and subscribe commands for quick prototyping.
# Terminal 1: subscribe to all metrics
pilotctl subscribe "metrics.*"
# Terminal 2: publish a CPU metric
pilotctl publish metrics.cpu '{"host":"agent-a","percent":73.2}'
# Terminal 3: publish a memory metric
pilotctl publish metrics.memory '{"host":"agent-b","alloc_mb":256}'
Terminal 1 will print both events as they arrive. Use this to test your topic hierarchy before writing Go code.
# Subscribe to everything (useful for debugging)
pilotctl subscribe "*"
# Subscribe to all pipeline events
pilotctl subscribe "pipeline.*"
# Subscribe to only anomalies
pilotctl subscribe "pipeline.stage2.result.anomaly"
The CLI commands connect to the same daemon and use the same event stream as the Go API. They are interchangeable.
Performance Characteristics
How does Pilot's event stream compare to a real broker on throughput? We measured on two agents in the same region (us-east1):
| Metric | Pilot Event Stream | Redis Pub/Sub | Kafka |
|---|---|---|---|
| Latency (p50) | 0.3ms | 0.2ms | 2ms |
| Latency (p99) | 1.2ms | 0.8ms | 8ms |
| Throughput | 45K msg/s | 120K msg/s | 80K msg/s |
| Memory (broker) | 0 MB (no broker) | 50 MB | 500 MB+ |
| NAT traversal | Built-in | No | No |
| Encryption | AES-256-GCM | Optional TLS | Optional TLS |
Pilot is slower on raw throughput because it is peer-to-peer (each publish goes directly to each subscriber) rather than going through an optimized broker process. For agent workloads where you are publishing 10-100 events per second, not 100,000, this is irrelevant. The latency is comparable, and the operational cost is zero.
When You Still Need Kafka
Honesty matters. Pilot's event stream is not a replacement for Kafka in all scenarios. You still need a real broker when:
- Persistence: Events must survive agent restarts. Pilot's stream is ephemeral. If the subscriber is offline when an event is published, that event is gone.
- Replay: You need to re-process events from a specific offset. Pilot has no event log.
- Consumer groups: Multiple instances of the same subscriber need to split the workload. Pilot delivers to all subscribers (broadcast), not one-of-many (queue).
- Exactly-once delivery: Pilot provides at-most-once semantics. If the UDP packet is lost during a congestion event and the retry window expires, the event is lost.
- Throughput above 50K msg/s: If you are publishing at Kafka-scale volumes, you need Kafka-scale infrastructure.
But here is the question to ask yourself: does your agent system actually need any of those? Most agent event architectures are:
- Status updates (fire-and-forget)
- Task completion signals (at-most-once is fine; the task system handles retries)
- Metrics (stale data is replaced by fresh data)
- Coordination signals ("I am ready", "start next phase")
For these patterns, 12 lines of Go and zero broker infrastructure is the right answer. Save Kafka for when you actually need Kafka.
Putting It Together
Start with the CLI to prototype your topic hierarchy. Move to Go when you need programmatic control. Use wildcard subscriptions to build fan-in aggregators. Use topic hierarchies to build branching pipelines. And when your agent count grows past the point where peer-to-peer pub/sub is efficient, add a broker for the high-volume topics while keeping Pilot for the coordination layer.
The event stream is one piece of Pilot's service stack. Port 1001 handles task submission and execution. Port 80 handles HTTP services. Port 1000 handles stdio-based communication. Together, they give agents a complete communication toolkit without external dependencies.
For the full API reference, see the documentation. For the Go driver source, check the pkg/driver package in the repository.
Try the Event Stream
Two terminals, two commands, zero infrastructure. See events flow between agents in under a minute.
View on GitHub
Pilot Protocol