← Back to Blog

Replace Your Agent Message Broker with 12 Lines of Go

February 15, 2026 tutorial pub-sub go

You have three agents. Agent A monitors CPU. Agent B monitors memory. Agent C needs to aggregate both streams and trigger alerts. The conventional answer is Kafka, RabbitMQ, or Redis Streams. That means another service to deploy, configure, secure, and keep running. For fewer than 1,000 agents, that is infrastructure overkill.

Pilot Protocol has a built-in event stream on port 1002. It supports topic-based routing, wildcard subscriptions, and persistent connections. The publish side is 6 lines of Go. The subscribe side is 6 lines. Twelve lines total, and you have a working agent pub/sub system with encrypted transport and NAT traversal included.

The Problem with Brokers for Agents

Message brokers are built for microservices running in the same data center. They assume:

Agent networks break every assumption. Agents sit behind NAT. They run on laptops, VMs, and edge devices. They join and leave dynamically. And most agent communication is fire-and-forget: metrics, status updates, task completion signals. You do not need Kafka's durability guarantees for a CPU utilization event that is stale in 10 seconds.

What you need is topic routing between agents that can find each other, over connections that work through NAT, with encryption that does not require certificate management. That is what port 1002 provides.

The 12-Line Solution

Here is the publisher. Six lines of application logic (excluding imports and error handling).

package main

import (
    "github.com/TeoSlayer/pilotprotocol/pkg/driver"
    "encoding/json"
    "time"
)

func main() {
    d, _ := driver.Connect()            // Connect to local daemon
    stream, _ := d.OpenEventStream()     // Open port 1002

    for {
        event := map[string]any{
            "topic": "metrics.cpu",
            "value": getCPUPercent(),
            "ts":    time.Now().Unix(),
        }
        data, _ := json.Marshal(event)
        stream.Publish("metrics.cpu", data)  // Publish to topic
        time.Sleep(5 * time.Second)
    }
}

And the subscriber. Six lines.

package main

import (
    "github.com/TeoSlayer/pilotprotocol/pkg/driver"
    "fmt"
)

func main() {
    d, _ := driver.Connect()             // Connect to local daemon
    stream, _ := d.OpenEventStream()      // Open port 1002
    ch, _ := stream.Subscribe("metrics.*") // Wildcard subscription

    for event := range ch {
        fmt.Printf("[%s] %s\n", event.Topic, string(event.Data))
    }
}

That is the complete pub/sub system. The driver.Connect() call connects to the local Pilot daemon via IPC socket. OpenEventStream() opens the event stream on port 1002. Publish sends a message to a topic. Subscribe returns a channel that delivers matching events.

What you get for free: Encryption (AES-256-GCM tunnel), NAT traversal (STUN + hole-punching), peer discovery (registry lookup), and access control (only trusted peers can subscribe). No broker configuration, no TLS certificates, no firewall rules.

Build a Monitoring Pipeline

Let us build something concrete. Three agents forming a monitoring pipeline:

Agent A: CPU Publisher

package main

import (
    "encoding/json"
    "os/exec"
    "strconv"
    "strings"
    "time"

    "github.com/TeoSlayer/pilotprotocol/pkg/driver"
)

type CPUMetric struct {
    Topic   string  `json:"topic"`
    Host    string  `json:"host"`
    Percent float64 `json:"percent"`
    Ts      int64   `json:"ts"`
}

func main() {
    d, err := driver.Connect()
    if err != nil {
        panic(err)
    }
    stream, err := d.OpenEventStream()
    if err != nil {
        panic(err)
    }

    hostname, _ := d.Hostname()

    for {
        pct := readCPU()
        metric := CPUMetric{
            Topic:   "metrics.cpu",
            Host:    hostname,
            Percent: pct,
            Ts:      time.Now().Unix(),
        }
        data, _ := json.Marshal(metric)
        stream.Publish("metrics.cpu", data)
        time.Sleep(5 * time.Second)
    }
}

func readCPU() float64 {
    // Read from /proc/stat or use runtime on macOS
    out, _ := exec.Command("sh", "-c",
        "top -bn1 | grep 'Cpu(s)' | awk '{print $2}'").Output()
    val, _ := strconv.ParseFloat(strings.TrimSpace(string(out)), 64)
    return val
}

Agent B: Memory Publisher

package main

import (
    "encoding/json"
    "runtime"
    "time"

    "github.com/TeoSlayer/pilotprotocol/pkg/driver"
)

type MemMetric struct {
    Topic    string `json:"topic"`
    Host     string `json:"host"`
    AllocMB  uint64 `json:"alloc_mb"`
    TotalMB  uint64 `json:"total_mb"`
    Ts       int64  `json:"ts"`
}

func main() {
    d, err := driver.Connect()
    if err != nil {
        panic(err)
    }
    stream, err := d.OpenEventStream()
    if err != nil {
        panic(err)
    }

    hostname, _ := d.Hostname()

    for {
        var m runtime.MemStats
        runtime.ReadMemStats(&m)

        metric := MemMetric{
            Topic:   "metrics.memory",
            Host:    hostname,
            AllocMB: m.Alloc / 1024 / 1024,
            TotalMB: m.TotalAlloc / 1024 / 1024,
            Ts:      time.Now().Unix(),
        }
        data, _ := json.Marshal(metric)
        stream.Publish("metrics.memory", data)
        time.Sleep(5 * time.Second)
    }
}

Agent C: Aggregator and Alerter

package main

import (
    "encoding/json"
    "fmt"
    "time"

    "github.com/TeoSlayer/pilotprotocol/pkg/driver"
)

type Alert struct {
    Level   string `json:"level"`
    Message string `json:"message"`
    Source  string `json:"source"`
    Ts      int64  `json:"ts"`
}

func main() {
    d, err := driver.Connect()
    if err != nil {
        panic(err)
    }
    stream, err := d.OpenEventStream()
    if err != nil {
        panic(err)
    }

    // Subscribe to ALL metrics with wildcard
    ch, err := stream.Subscribe("metrics.*")
    if err != nil {
        panic(err)
    }

    fmt.Println("Aggregator listening on metrics.*")

    for event := range ch {
        switch event.Topic {
        case "metrics.cpu":
            var m struct {
                Host    string  `json:"host"`
                Percent float64 `json:"percent"`
            }
            json.Unmarshal(event.Data, &m)
            fmt.Printf("[CPU] %s: %.1f%%\n", m.Host, m.Percent)

            if m.Percent > 90.0 {
                alert := Alert{
                    Level:   "critical",
                    Message: fmt.Sprintf("CPU at %.1f%% on %s", m.Percent, m.Host),
                    Source:  m.Host,
                    Ts:      time.Now().Unix(),
                }
                data, _ := json.Marshal(alert)
                stream.Publish("alerts.cpu", data)
                fmt.Printf("[ALERT] %s\n", alert.Message)
            }

        case "metrics.memory":
            var m struct {
                Host    string `json:"host"`
                AllocMB uint64 `json:"alloc_mb"`
            }
            json.Unmarshal(event.Data, &m)
            fmt.Printf("[MEM] %s: %d MB\n", m.Host, m.AllocMB)
        }
    }
}

Agent C subscribes to metrics.*, which matches both metrics.cpu and metrics.memory. When CPU exceeds 90%, it publishes an alert to alerts.cpu. Another agent could subscribe to alerts.* to trigger remediation -- scale down workload, notify humans, or restart the offending process.

Build a Workflow Pipeline

Event streams are not just for metrics. You can build multi-stage workflows where each agent watches for a completion event and starts the next stage.

The Pipeline

  1. Agent 1 (Ingestion): Receives raw data, normalizes it, publishes pipeline.stage1.complete
  2. Agent 2 (Analysis): Subscribes to pipeline.stage1.complete, runs analysis, publishes pipeline.stage2.complete
  3. Agent 3 (Summary): Subscribes to pipeline.stage2.complete, generates summary with LLM, publishes pipeline.stage3.complete
  4. Agent 4 (Output): Subscribes to pipeline.stage3.complete, writes final report

Here is Agent 2 as an example. The others follow the same pattern.

package main

import (
    "encoding/json"
    "fmt"

    "github.com/TeoSlayer/pilotprotocol/pkg/driver"
)

type StageEvent struct {
    Stage   int             `json:"stage"`
    JobID   string          `json:"job_id"`
    Payload json.RawMessage `json:"payload"`
}

func main() {
    d, _ := driver.Connect()
    stream, _ := d.OpenEventStream()

    // Wait for stage 1 to complete
    ch, _ := stream.Subscribe("pipeline.stage1.complete")

    fmt.Println("Agent 2 (Analysis) waiting for stage 1...")

    for event := range ch {
        var prev StageEvent
        json.Unmarshal(event.Data, &prev)

        fmt.Printf("Received job %s from stage 1\n", prev.JobID)

        // Do analysis work
        result := analyze(prev.Payload)

        // Signal next stage
        next := StageEvent{
            Stage:   2,
            JobID:   prev.JobID,
            Payload: result,
        }
        data, _ := json.Marshal(next)
        stream.Publish("pipeline.stage2.complete", data)

        fmt.Printf("Job %s: stage 2 complete, published to stage 3\n", prev.JobID)
    }
}

func analyze(raw json.RawMessage) json.RawMessage {
    // Your analysis logic here
    result := map[string]any{
        "analyzed": true,
        "findings": []string{"pattern_a detected", "anomaly in sector 7"},
    }
    data, _ := json.Marshal(result)
    return data
}

Each agent is independent. Agent 2 does not know about Agent 3. It just publishes a completion event. If Agent 3 is down, the event is lost (this is fire-and-forget pub/sub, not a durable queue). When Agent 3 comes back up and resubscribes, it picks up new events. For many agent workflows -- especially LLM pipelines where results are non-deterministic anyway -- this is fine.

Branching Pipelines

The wildcard subscription makes branching trivial. Suppose stage 2 can produce two kinds of output: normal results and anomalies. Publish to different sub-topics:

// Normal result
stream.Publish("pipeline.stage2.result.normal", data)

// Anomaly detected
stream.Publish("pipeline.stage2.result.anomaly", data)

An agent that handles all results subscribes to pipeline.stage2.result.*. An agent that only handles anomalies subscribes to pipeline.stage2.result.anomaly. The topic hierarchy is the routing logic. No broker configuration needed.

CLI Equivalent for Quick Testing

You do not need to write Go to test the event stream. The pilotctl CLI has publish and subscribe commands for quick prototyping.

# Terminal 1: subscribe to all metrics
pilotctl subscribe "metrics.*"

# Terminal 2: publish a CPU metric
pilotctl publish metrics.cpu '{"host":"agent-a","percent":73.2}'

# Terminal 3: publish a memory metric
pilotctl publish metrics.memory '{"host":"agent-b","alloc_mb":256}'

Terminal 1 will print both events as they arrive. Use this to test your topic hierarchy before writing Go code.

# Subscribe to everything (useful for debugging)
pilotctl subscribe "*"

# Subscribe to all pipeline events
pilotctl subscribe "pipeline.*"

# Subscribe to only anomalies
pilotctl subscribe "pipeline.stage2.result.anomaly"

The CLI commands connect to the same daemon and use the same event stream as the Go API. They are interchangeable.

Performance Characteristics

How does Pilot's event stream compare to a real broker on throughput? We measured on two agents in the same region (us-east1):

Metric Pilot Event Stream Redis Pub/Sub Kafka
Latency (p50) 0.3ms 0.2ms 2ms
Latency (p99) 1.2ms 0.8ms 8ms
Throughput 45K msg/s 120K msg/s 80K msg/s
Memory (broker) 0 MB (no broker) 50 MB 500 MB+
NAT traversal Built-in No No
Encryption AES-256-GCM Optional TLS Optional TLS

Pilot is slower on raw throughput because it is peer-to-peer (each publish goes directly to each subscriber) rather than going through an optimized broker process. For agent workloads where you are publishing 10-100 events per second, not 100,000, this is irrelevant. The latency is comparable, and the operational cost is zero.

When You Still Need Kafka

Honesty matters. Pilot's event stream is not a replacement for Kafka in all scenarios. You still need a real broker when:

But here is the question to ask yourself: does your agent system actually need any of those? Most agent event architectures are:

For these patterns, 12 lines of Go and zero broker infrastructure is the right answer. Save Kafka for when you actually need Kafka.

Putting It Together

Start with the CLI to prototype your topic hierarchy. Move to Go when you need programmatic control. Use wildcard subscriptions to build fan-in aggregators. Use topic hierarchies to build branching pipelines. And when your agent count grows past the point where peer-to-peer pub/sub is efficient, add a broker for the high-volume topics while keeping Pilot for the coordination layer.

The event stream is one piece of Pilot's service stack. Port 1001 handles task submission and execution. Port 80 handles HTTP services. Port 1000 handles stdio-based communication. Together, they give agents a complete communication toolkit without external dependencies.

For the full API reference, see the documentation. For the Go driver source, check the pkg/driver package in the repository.

Try the Event Stream

Two terminals, two commands, zero infrastructure. See events flow between agents in under a minute.

View on GitHub