← Back to Blog

Build an Agent Swarm That Self-Organizes via Reputation

February 16, 2026 tutorial swarm polo-score

Most multi-agent systems have an orchestrator. One central process decides who does what. That works until the orchestrator goes down, gets overloaded, or becomes the trust bottleneck. In this tutorial, we build something different: 10 agents that discover each other, establish trust, submit and execute tasks, and self-organize based on reputation. No orchestrator. No central scheduler. The swarm figures it out.

By the end, you will have a working Python codebase that spawns agents, lets them find peers via Pilot Protocol's registry, exchange tasks through the built-in task lifecycle, and watch as the polo score naturally causes the best-performing agents to attract the most work.

What We Are Building

The system has these properties:

The architecture relies on three Pilot Protocol features: the registry for discovery, the task service (port 1001) for work distribution, and the polo score for reputation tracking.

Prerequisites

Agent Architecture

Each agent is a Python script that wraps pilotctl via subprocess calls. This is intentional. Pilot Protocol's CLI is the stable interface. You do not need Go bindings or a Python SDK. If you can call a subprocess, you can participate in the network.

The Agent Class

import subprocess
import json
import time
import random
import os

class SwarmAgent:
    def __init__(self, agent_id, role, registry_addr):
        self.agent_id = agent_id
        self.role = role
        self.registry_addr = registry_addr
        self.hostname = f"swarm-{role}-{agent_id}"
        self.peers = {}
        self.tasks_completed = 0
        self.polo_score = 0.0

    def pilotctl(self, *args):
        """Run a pilotctl command and return parsed output."""
        cmd = ["pilotctl"] + list(args)
        result = subprocess.run(
            cmd, capture_output=True, text=True, timeout=30
        )
        if result.returncode != 0:
            raise RuntimeError(f"pilotctl failed: {result.stderr}")
        return result.stdout.strip()

    def start_daemon(self):
        """Start the Pilot daemon for this agent."""
        subprocess.Popen([
            "pilot-daemon",
            "-registry-addr", self.registry_addr,
            "-beacon-addr", self.registry_addr.replace(":9000", ":9001"),
        ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        time.sleep(2)  # Wait for daemon startup + STUN

    def register(self):
        """Register hostname and set agent tags."""
        self.pilotctl("set-hostname", self.hostname)
        self.pilotctl(
            "set-tags",
            f"role={self.role},swarm=demo,capacity=medium"
        )
        # Enable task acceptance
        self.pilotctl("task-opt-in")
        print(f"[{self.hostname}] Registered with tags")

Each agent starts its own daemon, registers a hostname, sets descriptive tags, and opts into the task system. The task-opt-in command tells the registry that this agent is willing to accept and execute tasks from trusted peers.

Discovering Peers

Agents find each other through the registry. No hardcoded addresses, no configuration files, no service mesh. You query by tag and get back a list of matching agents.

    def discover_peers(self):
        """Find other agents in the swarm by tag."""
        output = self.pilotctl("lookup", "--tag", "swarm=demo")
        peers = json.loads(output)
        for peer in peers:
            addr = peer["address"]
            if addr != self.my_address():
                self.peers[addr] = {
                    "hostname": peer.get("hostname", "unknown"),
                    "role": peer.get("tags", {}).get("role", "unknown"),
                    "polo_score": peer.get("polo_score", 0.0),
                    "trusted": False,
                }
        print(f"[{self.hostname}] Discovered {len(self.peers)} peers")
        return self.peers

    def my_address(self):
        """Get this agent's Pilot address."""
        output = self.pilotctl("status")
        status = json.loads(output)
        return status["address"]

The lookup --tag command queries the registry for all agents with matching tags. Each result includes the agent's virtual address, hostname, tags, and current polo score. This is how the swarm maintains awareness without an orchestrator -- every agent can ask the registry "who else is in my swarm?" at any time.

Establishing Trust

Discovery is not the same as trust. By default, agents are invisible. Before an agent can send tasks to a peer, they must complete a mutual Ed25519 handshake.

    def establish_trust(self):
        """Initiate handshake with all discovered peers."""
        for addr, info in self.peers.items():
            if info["trusted"]:
                continue
            try:
                self.pilotctl("handshake", addr)
                self.peers[addr]["trusted"] = True
                print(f"[{self.hostname}] Trusted: {info['hostname']}")
            except RuntimeError as e:
                print(f"[{self.hostname}] Handshake failed with {addr}: {e}")

The handshake is mutual. Both sides sign a challenge with their Ed25519 identity key. If either side rejects (or if the agent has not opted into the handshake), the connection fails. This means an agent can refuse trust from unknown peers, and a compromised agent can be revoked from the network by removing its trust entries.

The Task Lifecycle

Pilot Protocol has a built-in task lifecycle on port 1001 (data exchange). The flow is: submit, accept/decline, execute, return results. Here is how each step works.

Submitting Tasks

    def submit_task(self, target_addr, task_data):
        """Submit a task to a peer agent."""
        task_json = json.dumps({
            "type": task_data["type"],
            "payload": task_data["payload"],
            "timeout_seconds": 120,
            "submitter": self.my_address(),
        })

        output = self.pilotctl(
            "task", "submit",
            "--to", target_addr,
            "--data", task_json,
        )
        task_id = json.loads(output)["task_id"]
        print(f"[{self.hostname}] Submitted task {task_id} to {target_addr}")
        return task_id

Choosing the Best Peer

This is where the self-organization begins. Instead of randomly assigning tasks, each agent chooses the peer with the highest polo score for the requested role.

    def select_best_peer(self, role):
        """Select the highest-reputation peer for a given role."""
        candidates = [
            (addr, info) for addr, info in self.peers.items()
            if info["role"] == role and info["trusted"]
        ]
        if not candidates:
            return None

        # Sort by polo score, highest first
        candidates.sort(key=lambda x: x[1]["polo_score"], reverse=True)

        # Add randomness: 80% chance of best, 20% exploration
        if random.random() < 0.2 and len(candidates) > 1:
            return random.choice(candidates[1:])[0]

        return candidates[0][0]

The 80/20 split is important. If agents always choose the top-scoring peer, new agents never get a chance to build reputation. The 20% exploration rate gives newcomers work, while still routing most tasks to proven performers.

Executing Tasks with LLMs

    def listen_for_tasks(self):
        """Poll for incoming tasks and execute them."""
        while True:
            try:
                output = self.pilotctl("task", "poll", "--timeout", "10")
                if not output:
                    continue

                task = json.loads(output)
                task_id = task["task_id"]

                print(f"[{self.hostname}] Received task {task_id}")

                # Accept the task
                self.pilotctl("task", "accept", "--id", task_id)

                # Execute with LLM
                start_time = time.time()
                result = self.execute_with_llm(task["data"])
                elapsed = time.time() - start_time

                # Return results
                self.pilotctl(
                    "task", "complete",
                    "--id", task_id,
                    "--result", json.dumps(result),
                    "--cpu-minutes", str(round(elapsed / 60, 4)),
                )

                self.tasks_completed += 1
                print(f"[{self.hostname}] Completed task {task_id} "
                      f"in {elapsed:.1f}s")

            except subprocess.TimeoutExpired:
                continue
            except Exception as e:
                print(f"[{self.hostname}] Task error: {e}")
                if 'task_id' in dir():
                    self.pilotctl("task", "fail", "--id", task_id,
                                  "--error", str(e))

The LLM Execution Function

    def execute_with_llm(self, task_data):
        """Call an LLM to process the task."""
        import openai

        client = openai.OpenAI(api_key=os.environ["OPENAI_API_KEY"])

        task_type = task_data.get("type", "general")
        payload = task_data.get("payload", "")

        prompts = {
            "summarize": f"Summarize this text concisely:\n\n{payload}",
            "analyze": f"Analyze this data and provide key insights:\n\n{payload}",
            "code_review": f"Review this code for bugs and improvements:\n\n{payload}",
            "translate": f"Translate this to English:\n\n{payload}",
            "general": f"Process this request:\n\n{payload}",
        }

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": f"You are a {task_type} specialist agent."},
                {"role": "user", "content": prompts.get(task_type, prompts["general"])},
            ],
            max_tokens=1024,
        )

        return {
            "output": response.choices[0].message.content,
            "model": "gpt-4o-mini",
            "tokens_used": response.usage.total_tokens,
        }

Each agent is a specialist. One handles summarization, another does code review, another translates. The task type in the submission determines which prompt template is used. But the routing decision -- which agent gets the task -- is based entirely on polo score.

The Polo Score Formula

The polo score is how Pilot Protocol tracks reputation without a blockchain, without staking, and without tokens. It is computed from actual behavior: tasks completed, CPU time contributed, and execution efficiency.

The Formula

# Polo score reward for a completed task
reward = round(1 + log2(1 + cpu_minutes)) * efficiency

Where:

Walk Through Examples

Let us trace through three scenarios to see how the score evolves.

Scenario 1: Quick summarization task. Agent completes a text summarization in 8 seconds (0.133 CPU minutes). Efficiency is 1.0 (no failures).

reward = round(1 + log2(1 + 0.133)) * 1.0
       = round(1 + log2(1.133))
       = round(1 + 0.180)
       = round(1.180)
       = 1

One point. Small tasks earn small rewards. This prevents agents from spamming trivial tasks to inflate their score.

Scenario 2: Heavy code analysis. Agent spends 5 CPU minutes analyzing a large codebase. Efficiency is 1.0.

reward = round(1 + log2(1 + 5.0)) * 1.0
       = round(1 + log2(6.0))
       = round(1 + 2.585)
       = round(3.585)
       = 4

Four points. More work earns more, but the logarithmic curve means 5 minutes of CPU earns 4x a quick task, not 40x.

Scenario 3: Unreliable agent. Same 5-minute task, but this agent has failed 3 out of its last 10 tasks. Efficiency drops to 0.7.

reward = round(1 + log2(1 + 5.0)) * 0.7
       = round(3.585) * 0.7
       = 4 * 0.7
       = 2.8  # (truncated to 2)

Two points instead of four. Unreliability has a direct cost. Over time, unreliable agents accumulate score slower and get selected less often, creating a feedback loop that pushes them out of the high-value task pool.

The Emergence: Self-Organization in Action

Now we wire it all together. Here is the main loop that each agent runs.

import threading
import math

def run_agent(agent_id, role, registry_addr):
    agent = SwarmAgent(agent_id, role, registry_addr)
    agent.start_daemon()
    agent.register()

    time.sleep(5)  # Let other agents register

    agent.discover_peers()
    agent.establish_trust()

    # Start task listener in background
    listener = threading.Thread(
        target=agent.listen_for_tasks, daemon=True
    )
    listener.start()

    # Main loop: periodically submit tasks to peers
    task_types = ["summarize", "analyze", "code_review", "translate"]

    while True:
        # Refresh peer info (polo scores change over time)
        agent.discover_peers()

        # Pick a task type and find the best peer for it
        task_type = random.choice(task_types)
        role_for_task = {
            "summarize": "summarizer",
            "analyze": "analyzer",
            "code_review": "reviewer",
            "translate": "translator",
        }.get(task_type, "general")

        target = agent.select_best_peer(role_for_task)
        if target is None:
            time.sleep(5)
            continue

        # Submit the task
        try:
            agent.submit_task(target, {
                "type": task_type,
                "payload": generate_sample_payload(task_type),
            })
        except Exception as e:
            print(f"[{agent.hostname}] Submit failed: {e}")

        time.sleep(random.uniform(10, 30))  # Stagger submissions

Spawning the Swarm

def main():
    registry_addr = "rendezvous.example.com:9000"

    # Define the swarm: 10 agents with different roles
    agents = [
        (0, "summarizer"),
        (1, "summarizer"),
        (2, "analyzer"),
        (3, "analyzer"),
        (4, "reviewer"),
        (5, "reviewer"),
        (6, "translator"),
        (7, "translator"),
        (8, "general"),    # Generalist: accepts any task type
        (9, "general"),
    ]

    threads = []
    for agent_id, role in agents:
        t = threading.Thread(
            target=run_agent,
            args=(agent_id, role, registry_addr),
        )
        t.start()
        threads.append(t)
        time.sleep(1)  # Stagger daemon startups

    for t in threads:
        t.join()

if __name__ == "__main__":
    main()

What Happens Over Time

Run this for 30 minutes and watch the polo scores. Here is what typically emerges:

  1. Minutes 0-5: All agents have polo score 0. Task routing is essentially random (the 80/20 split means everything is exploration).
  2. Minutes 5-10: Some agents complete tasks faster than others. Maybe swarm-summarizer-0 has lower latency to the LLM API. It accumulates score faster and starts getting 80% of summarization tasks.
  3. Minutes 10-20: The score gap widens. High-scoring agents get more work, complete more tasks, and earn more score. A positive feedback loop forms. But the 20% exploration rate keeps the second-tier agents in the game.
  4. Minutes 20-30: The swarm has self-organized. Each role has a clear "preferred" agent and a backup. If the preferred agent goes down, the backup's exploration-earned score makes it the immediate replacement. No failover logic needed.

This is the key insight: The swarm does not need an orchestrator because reputation is the orchestrator. The polo score encodes "who is good at what" as a number. The routing algorithm uses that number. Emergence happens automatically.

Monitoring the Swarm

You can watch the self-organization happen in real time by polling the registry for polo scores.

def monitor_swarm(registry_addr):
    """Print polo score leaderboard every 30 seconds."""
    while True:
        output = subprocess.run(
            ["pilotctl", "lookup", "--tag", "swarm=demo"],
            capture_output=True, text=True
        ).stdout
        agents = json.loads(output)

        # Sort by polo score
        agents.sort(key=lambda a: a.get("polo_score", 0), reverse=True)

        print("\n--- Swarm Leaderboard ---")
        print(f"{'Hostname':<25} {'Role':<12} {'Score':<8} {'Tasks':<8}")
        print("-" * 55)
        for a in agents:
            print(f"{a.get('hostname', 'unknown'):<25} "
                  f"{a.get('tags', {}).get('role', '?'):<12} "
                  f"{a.get('polo_score', 0):<8.1f} "
                  f"{a.get('tasks_completed', 0):<8}")

        time.sleep(30)

After 30 minutes, the leaderboard might look like this:

--- Swarm Leaderboard ---
Hostname                  Role         Score    Tasks
-------------------------------------------------------
swarm-summarizer-0        summarizer   47.0     38
swarm-analyzer-2          analyzer     41.0     29
swarm-reviewer-4          reviewer     38.0     31
swarm-translator-7        translator   35.0     27
swarm-general-8           general      28.0     22
swarm-summarizer-1        summarizer   19.0     16
swarm-analyzer-3          analyzer     17.0     14
swarm-reviewer-5          reviewer     15.0     13
swarm-translator-6        translator   12.0     10
swarm-general-9           general      9.0      8

The top agent in each role has roughly 2-3x the score of the backup. The swarm has organized itself into primary and secondary workers without any explicit assignment.

Scaling to 100 Agents

Ten agents is a demo. Can this approach scale? We tested with 100 agents on 5 VMs (20 agents per VM) and found two things that matter.

Memory: 10 MB Per Daemon

Each Pilot daemon uses approximately 10 MB of RSS. On a 16 GB VM, you can run 200+ daemons comfortably. The registry handles 100 concurrent agents without measurable latency increase. The benchmark data confirms the per-connection memory stays flat.

# Memory usage across 100 daemons on a single VM
ps aux | grep pilot-daemon | awk '{sum += $6} END {print sum/1024 "MB"}'
1024MB  # ~10 MB per daemon average

Polo Gate: Preventing Low-Rep Spam

At 100 agents, a new problem appears: low-reputation agents can spam task submissions to high-value workers, consuming their cycles with junk work. The polo gate prevents this.

# Agents can set a minimum polo score for incoming tasks
pilotctl set-polo-gate --min-score 10

With a polo gate of 10, new agents (score 0) cannot submit tasks to high-value workers. They must first build reputation by completing tasks from other new agents or from established agents that explicitly submit to them. This creates a natural onboarding ramp:

  1. New agent joins with score 0
  2. Picks up tasks from other low-score agents (or via the 20% exploration rate)
  3. Builds score to 10+
  4. Can now submit tasks to high-value workers

This mirrors how reputation works in real economies. You start small, prove yourself, and earn access to higher-value networks.

Network Topology at Scale

With 100 agents, the trust mesh becomes important. Full mesh (every agent trusts every other) means 4,950 handshake pairs. That is expensive. In practice, agents should trust only peers they actually interact with. The swarm naturally converges on a sparse trust graph where each agent trusts 10-20 peers based on role affinity.

    def selective_trust(self):
        """Only trust peers in roles we submit tasks to."""
        needed_roles = self.get_target_roles()
        for addr, info in self.peers.items():
            if info["role"] in needed_roles and not info["trusted"]:
                self.pilotctl("handshake", addr)
                self.peers[addr]["trusted"] = True

Adding Fault Tolerance

A real swarm needs to handle agent failures. Pilot makes this straightforward because the registry tracks liveness via keepalive probes.

    def submit_with_retry(self, task_data, role, max_retries=3):
        """Submit a task with automatic failover to next-best peer."""
        for attempt in range(max_retries):
            target = self.select_best_peer(role)
            if target is None:
                print(f"[{self.hostname}] No peers available for {role}")
                return None

            try:
                task_id = self.submit_task(target, task_data)
                # Wait for result with timeout
                result = self.wait_for_result(task_id, timeout=120)
                return result
            except Exception as e:
                print(f"[{self.hostname}] Attempt {attempt+1} failed: {e}")
                # Mark peer as temporarily unreliable
                self.peers[target]["polo_score"] *= 0.5
                continue

        return None

    def wait_for_result(self, task_id, timeout=120):
        """Poll for task completion."""
        deadline = time.time() + timeout
        while time.time() < deadline:
            output = self.pilotctl("task", "status", "--id", task_id)
            status = json.loads(output)
            if status["state"] == "completed":
                return status["result"]
            if status["state"] == "failed":
                raise RuntimeError(f"Task failed: {status.get('error')}")
            time.sleep(2)
        raise TimeoutError(f"Task {task_id} timed out")

When a task fails, the agent halves the peer's local polo score estimate and retries with the next-best peer. The real polo score in the registry also degrades because the failed task hurts the peer's efficiency multiplier. Over time, unreliable agents sink to the bottom of the selection pool.

The Complete Agent Script

Here is the full script that ties everything together. Save it as swarm_agent.py and run with the agent ID and role as arguments.

#!/usr/bin/env python3
"""Pilot Protocol swarm agent with reputation-based self-organization."""

import subprocess
import json
import time
import random
import threading
import os
import sys
import math

REGISTRY_ADDR = os.environ.get("REGISTRY_ADDR", "rendezvous.example.com:9000")

class SwarmAgent:
    def __init__(self, agent_id, role):
        self.agent_id = agent_id
        self.role = role
        self.hostname = f"swarm-{role}-{agent_id}"
        self.peers = {}
        self.tasks_completed = 0

    def pilotctl(self, *args):
        cmd = ["pilotctl"] + list(args)
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
        if result.returncode != 0:
            raise RuntimeError(result.stderr)
        return result.stdout.strip()

    def start(self):
        subprocess.Popen([
            "pilot-daemon",
            "-registry-addr", REGISTRY_ADDR,
            "-beacon-addr", REGISTRY_ADDR.replace(":9000", ":9001"),
        ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        time.sleep(2)
        self.pilotctl("set-hostname", self.hostname)
        self.pilotctl("set-tags", f"role={self.role},swarm=demo")
        self.pilotctl("task-opt-in")

    def discover(self):
        output = self.pilotctl("lookup", "--tag", "swarm=demo")
        my_addr = self.my_address()
        for peer in json.loads(output):
            addr = peer["address"]
            if addr != my_addr:
                self.peers[addr] = {
                    "hostname": peer.get("hostname", "?"),
                    "role": peer.get("tags", {}).get("role", "?"),
                    "polo_score": peer.get("polo_score", 0),
                    "trusted": self.peers.get(addr, {}).get("trusted", False),
                }

    def my_address(self):
        return json.loads(self.pilotctl("status"))["address"]

    def trust_peers(self):
        for addr, info in self.peers.items():
            if not info["trusted"]:
                try:
                    self.pilotctl("handshake", addr)
                    info["trusted"] = True
                except RuntimeError:
                    pass

    def best_peer(self, role):
        candidates = [
            (a, i) for a, i in self.peers.items()
            if i["role"] == role and i["trusted"]
        ]
        if not candidates:
            return None
        candidates.sort(key=lambda x: x[1]["polo_score"], reverse=True)
        if random.random() < 0.2 and len(candidates) > 1:
            return random.choice(candidates[1:])[0]
        return candidates[0][0]

    def run(self):
        self.start()
        time.sleep(5)
        self.discover()
        self.trust_peers()
        threading.Thread(target=self.listen, daemon=True).start()

        roles = ["summarizer", "analyzer", "reviewer", "translator"]
        while True:
            self.discover()
            role = random.choice(roles)
            target = self.best_peer(role)
            if target:
                try:
                    self.pilotctl("task", "submit", "--to", target,
                        "--data", json.dumps({"type": role, "payload": "sample"}))
                except RuntimeError:
                    pass
            time.sleep(random.uniform(10, 30))

    def listen(self):
        while True:
            try:
                output = self.pilotctl("task", "poll", "--timeout", "10")
                if not output:
                    continue
                task = json.loads(output)
                self.pilotctl("task", "accept", "--id", task["task_id"])
                start = time.time()
                result = {"output": f"Processed by {self.hostname}"}
                elapsed = time.time() - start
                self.pilotctl("task", "complete", "--id", task["task_id"],
                    "--result", json.dumps(result),
                    "--cpu-minutes", str(round(elapsed / 60, 4)))
                self.tasks_completed += 1
            except Exception:
                continue

if __name__ == "__main__":
    agent_id = int(sys.argv[1])
    role = sys.argv[2]
    SwarmAgent(agent_id, role).run()

Run 10 instances:

python swarm_agent.py 0 summarizer &
python swarm_agent.py 1 summarizer &
python swarm_agent.py 2 analyzer &
python swarm_agent.py 3 analyzer &
python swarm_agent.py 4 reviewer &
python swarm_agent.py 5 reviewer &
python swarm_agent.py 6 translator &
python swarm_agent.py 7 translator &
python swarm_agent.py 8 general &
python swarm_agent.py 9 general &

What You Have Built

This is not a toy demo. The patterns here -- discovery, trust, task delegation, reputation-based routing, fault tolerance -- are the building blocks of production multi-agent systems. The difference from conventional architectures is what is missing: no orchestrator, no message queue, no service mesh, no load balancer, no health check infrastructure.

The swarm self-organizes because the incentive structure is correct. Agents that do good work earn reputation. Reputation earns more work. More work earns more reputation. Bad actors see their efficiency multiplier decay, their score stagnate, and their task flow dry up. The system is self-correcting.

For a deeper look at the polo score design and its gaming resistance properties, read The Polo Score: Designing a Reputation System Without Blockchain. For the network fundamentals, see How Pilot Protocol Works.

Build Your Own Swarm

Everything in this tutorial runs on the open-source Pilot Protocol. Clone the repo and start swarming.

View on GitHub