Build an Agent Swarm That Self-Organizes via Reputation
Most multi-agent systems have an orchestrator. One central process decides who does what. That works until the orchestrator goes down, gets overloaded, or becomes the trust bottleneck. In this tutorial, we build something different: 10 agents that discover each other, establish trust, submit and execute tasks, and self-organize based on reputation. No orchestrator. No central scheduler. The swarm figures it out.
By the end, you will have a working Python codebase that spawns agents, lets them find peers via Pilot Protocol's registry, exchange tasks through the built-in task lifecycle, and watch as the polo score naturally causes the best-performing agents to attract the most work.
What We Are Building
The system has these properties:
- 10 agents, each running as a separate process with its own Pilot daemon
- Peer discovery via the registry, using hostname and tag-based lookups
- Mutual trust established through Ed25519 handshakes (no pre-shared keys)
- Task submission and execution using Pilot's built-in task lifecycle
- LLM integration for task execution (each agent calls an LLM to process tasks)
- Polo score accumulation that determines which agents get more work over time
- No central orchestrator -- every agent runs the same code and makes its own decisions
The architecture relies on three Pilot Protocol features: the registry for discovery, the task service (port 1001) for work distribution, and the polo score for reputation tracking.
Prerequisites
- Python 3.10+
- Pilot Protocol installed (
go install github.com/TeoSlayer/pilotprotocol/cmd/...) - A running rendezvous server (or use the public one for testing)
- An OpenAI API key (or any LLM API) for task execution
Agent Architecture
Each agent is a Python script that wraps pilotctl via subprocess calls. This is intentional. Pilot Protocol's CLI is the stable interface. You do not need Go bindings or a Python SDK. If you can call a subprocess, you can participate in the network.
The Agent Class
import subprocess
import json
import time
import random
import os
class SwarmAgent:
def __init__(self, agent_id, role, registry_addr):
self.agent_id = agent_id
self.role = role
self.registry_addr = registry_addr
self.hostname = f"swarm-{role}-{agent_id}"
self.peers = {}
self.tasks_completed = 0
self.polo_score = 0.0
def pilotctl(self, *args):
"""Run a pilotctl command and return parsed output."""
cmd = ["pilotctl"] + list(args)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=30
)
if result.returncode != 0:
raise RuntimeError(f"pilotctl failed: {result.stderr}")
return result.stdout.strip()
def start_daemon(self):
"""Start the Pilot daemon for this agent."""
subprocess.Popen([
"pilot-daemon",
"-registry-addr", self.registry_addr,
"-beacon-addr", self.registry_addr.replace(":9000", ":9001"),
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(2) # Wait for daemon startup + STUN
def register(self):
"""Register hostname and set agent tags."""
self.pilotctl("set-hostname", self.hostname)
self.pilotctl(
"set-tags",
f"role={self.role},swarm=demo,capacity=medium"
)
# Enable task acceptance
self.pilotctl("task-opt-in")
print(f"[{self.hostname}] Registered with tags")
Each agent starts its own daemon, registers a hostname, sets descriptive tags, and opts into the task system. The task-opt-in command tells the registry that this agent is willing to accept and execute tasks from trusted peers.
Discovering Peers
Agents find each other through the registry. No hardcoded addresses, no configuration files, no service mesh. You query by tag and get back a list of matching agents.
def discover_peers(self):
"""Find other agents in the swarm by tag."""
output = self.pilotctl("lookup", "--tag", "swarm=demo")
peers = json.loads(output)
for peer in peers:
addr = peer["address"]
if addr != self.my_address():
self.peers[addr] = {
"hostname": peer.get("hostname", "unknown"),
"role": peer.get("tags", {}).get("role", "unknown"),
"polo_score": peer.get("polo_score", 0.0),
"trusted": False,
}
print(f"[{self.hostname}] Discovered {len(self.peers)} peers")
return self.peers
def my_address(self):
"""Get this agent's Pilot address."""
output = self.pilotctl("status")
status = json.loads(output)
return status["address"]
The lookup --tag command queries the registry for all agents with matching tags. Each result includes the agent's virtual address, hostname, tags, and current polo score. This is how the swarm maintains awareness without an orchestrator -- every agent can ask the registry "who else is in my swarm?" at any time.
Establishing Trust
Discovery is not the same as trust. By default, agents are invisible. Before an agent can send tasks to a peer, they must complete a mutual Ed25519 handshake.
def establish_trust(self):
"""Initiate handshake with all discovered peers."""
for addr, info in self.peers.items():
if info["trusted"]:
continue
try:
self.pilotctl("handshake", addr)
self.peers[addr]["trusted"] = True
print(f"[{self.hostname}] Trusted: {info['hostname']}")
except RuntimeError as e:
print(f"[{self.hostname}] Handshake failed with {addr}: {e}")
The handshake is mutual. Both sides sign a challenge with their Ed25519 identity key. If either side rejects (or if the agent has not opted into the handshake), the connection fails. This means an agent can refuse trust from unknown peers, and a compromised agent can be revoked from the network by removing its trust entries.
The Task Lifecycle
Pilot Protocol has a built-in task lifecycle on port 1001 (data exchange). The flow is: submit, accept/decline, execute, return results. Here is how each step works.
Submitting Tasks
def submit_task(self, target_addr, task_data):
"""Submit a task to a peer agent."""
task_json = json.dumps({
"type": task_data["type"],
"payload": task_data["payload"],
"timeout_seconds": 120,
"submitter": self.my_address(),
})
output = self.pilotctl(
"task", "submit",
"--to", target_addr,
"--data", task_json,
)
task_id = json.loads(output)["task_id"]
print(f"[{self.hostname}] Submitted task {task_id} to {target_addr}")
return task_id
Choosing the Best Peer
This is where the self-organization begins. Instead of randomly assigning tasks, each agent chooses the peer with the highest polo score for the requested role.
def select_best_peer(self, role):
"""Select the highest-reputation peer for a given role."""
candidates = [
(addr, info) for addr, info in self.peers.items()
if info["role"] == role and info["trusted"]
]
if not candidates:
return None
# Sort by polo score, highest first
candidates.sort(key=lambda x: x[1]["polo_score"], reverse=True)
# Add randomness: 80% chance of best, 20% exploration
if random.random() < 0.2 and len(candidates) > 1:
return random.choice(candidates[1:])[0]
return candidates[0][0]
The 80/20 split is important. If agents always choose the top-scoring peer, new agents never get a chance to build reputation. The 20% exploration rate gives newcomers work, while still routing most tasks to proven performers.
Executing Tasks with LLMs
def listen_for_tasks(self):
"""Poll for incoming tasks and execute them."""
while True:
try:
output = self.pilotctl("task", "poll", "--timeout", "10")
if not output:
continue
task = json.loads(output)
task_id = task["task_id"]
print(f"[{self.hostname}] Received task {task_id}")
# Accept the task
self.pilotctl("task", "accept", "--id", task_id)
# Execute with LLM
start_time = time.time()
result = self.execute_with_llm(task["data"])
elapsed = time.time() - start_time
# Return results
self.pilotctl(
"task", "complete",
"--id", task_id,
"--result", json.dumps(result),
"--cpu-minutes", str(round(elapsed / 60, 4)),
)
self.tasks_completed += 1
print(f"[{self.hostname}] Completed task {task_id} "
f"in {elapsed:.1f}s")
except subprocess.TimeoutExpired:
continue
except Exception as e:
print(f"[{self.hostname}] Task error: {e}")
if 'task_id' in dir():
self.pilotctl("task", "fail", "--id", task_id,
"--error", str(e))
The LLM Execution Function
def execute_with_llm(self, task_data):
"""Call an LLM to process the task."""
import openai
client = openai.OpenAI(api_key=os.environ["OPENAI_API_KEY"])
task_type = task_data.get("type", "general")
payload = task_data.get("payload", "")
prompts = {
"summarize": f"Summarize this text concisely:\n\n{payload}",
"analyze": f"Analyze this data and provide key insights:\n\n{payload}",
"code_review": f"Review this code for bugs and improvements:\n\n{payload}",
"translate": f"Translate this to English:\n\n{payload}",
"general": f"Process this request:\n\n{payload}",
}
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"You are a {task_type} specialist agent."},
{"role": "user", "content": prompts.get(task_type, prompts["general"])},
],
max_tokens=1024,
)
return {
"output": response.choices[0].message.content,
"model": "gpt-4o-mini",
"tokens_used": response.usage.total_tokens,
}
Each agent is a specialist. One handles summarization, another does code review, another translates. The task type in the submission determines which prompt template is used. But the routing decision -- which agent gets the task -- is based entirely on polo score.
The Polo Score Formula
The polo score is how Pilot Protocol tracks reputation without a blockchain, without staking, and without tokens. It is computed from actual behavior: tasks completed, CPU time contributed, and execution efficiency.
The Formula
# Polo score reward for a completed task
reward = round(1 + log2(1 + cpu_minutes)) * efficiency
Where:
cpu_minutesis the self-reported CPU time spent executing the taskefficiencyis a multiplier based on task success rate (1.0 for 100% success, decays with failures)- The
log2curve means diminishing returns on long tasks -- you cannot game the system by running a task for hours round()ensures integer score increments for small tasks
Walk Through Examples
Let us trace through three scenarios to see how the score evolves.
Scenario 1: Quick summarization task. Agent completes a text summarization in 8 seconds (0.133 CPU minutes). Efficiency is 1.0 (no failures).
reward = round(1 + log2(1 + 0.133)) * 1.0
= round(1 + log2(1.133))
= round(1 + 0.180)
= round(1.180)
= 1
One point. Small tasks earn small rewards. This prevents agents from spamming trivial tasks to inflate their score.
Scenario 2: Heavy code analysis. Agent spends 5 CPU minutes analyzing a large codebase. Efficiency is 1.0.
reward = round(1 + log2(1 + 5.0)) * 1.0
= round(1 + log2(6.0))
= round(1 + 2.585)
= round(3.585)
= 4
Four points. More work earns more, but the logarithmic curve means 5 minutes of CPU earns 4x a quick task, not 40x.
Scenario 3: Unreliable agent. Same 5-minute task, but this agent has failed 3 out of its last 10 tasks. Efficiency drops to 0.7.
reward = round(1 + log2(1 + 5.0)) * 0.7
= round(3.585) * 0.7
= 4 * 0.7
= 2.8 # (truncated to 2)
Two points instead of four. Unreliability has a direct cost. Over time, unreliable agents accumulate score slower and get selected less often, creating a feedback loop that pushes them out of the high-value task pool.
The Emergence: Self-Organization in Action
Now we wire it all together. Here is the main loop that each agent runs.
import threading
import math
def run_agent(agent_id, role, registry_addr):
agent = SwarmAgent(agent_id, role, registry_addr)
agent.start_daemon()
agent.register()
time.sleep(5) # Let other agents register
agent.discover_peers()
agent.establish_trust()
# Start task listener in background
listener = threading.Thread(
target=agent.listen_for_tasks, daemon=True
)
listener.start()
# Main loop: periodically submit tasks to peers
task_types = ["summarize", "analyze", "code_review", "translate"]
while True:
# Refresh peer info (polo scores change over time)
agent.discover_peers()
# Pick a task type and find the best peer for it
task_type = random.choice(task_types)
role_for_task = {
"summarize": "summarizer",
"analyze": "analyzer",
"code_review": "reviewer",
"translate": "translator",
}.get(task_type, "general")
target = agent.select_best_peer(role_for_task)
if target is None:
time.sleep(5)
continue
# Submit the task
try:
agent.submit_task(target, {
"type": task_type,
"payload": generate_sample_payload(task_type),
})
except Exception as e:
print(f"[{agent.hostname}] Submit failed: {e}")
time.sleep(random.uniform(10, 30)) # Stagger submissions
Spawning the Swarm
def main():
registry_addr = "rendezvous.example.com:9000"
# Define the swarm: 10 agents with different roles
agents = [
(0, "summarizer"),
(1, "summarizer"),
(2, "analyzer"),
(3, "analyzer"),
(4, "reviewer"),
(5, "reviewer"),
(6, "translator"),
(7, "translator"),
(8, "general"), # Generalist: accepts any task type
(9, "general"),
]
threads = []
for agent_id, role in agents:
t = threading.Thread(
target=run_agent,
args=(agent_id, role, registry_addr),
)
t.start()
threads.append(t)
time.sleep(1) # Stagger daemon startups
for t in threads:
t.join()
if __name__ == "__main__":
main()
What Happens Over Time
Run this for 30 minutes and watch the polo scores. Here is what typically emerges:
- Minutes 0-5: All agents have polo score 0. Task routing is essentially random (the 80/20 split means everything is exploration).
- Minutes 5-10: Some agents complete tasks faster than others. Maybe
swarm-summarizer-0has lower latency to the LLM API. It accumulates score faster and starts getting 80% of summarization tasks. - Minutes 10-20: The score gap widens. High-scoring agents get more work, complete more tasks, and earn more score. A positive feedback loop forms. But the 20% exploration rate keeps the second-tier agents in the game.
- Minutes 20-30: The swarm has self-organized. Each role has a clear "preferred" agent and a backup. If the preferred agent goes down, the backup's exploration-earned score makes it the immediate replacement. No failover logic needed.
This is the key insight: The swarm does not need an orchestrator because reputation is the orchestrator. The polo score encodes "who is good at what" as a number. The routing algorithm uses that number. Emergence happens automatically.
Monitoring the Swarm
You can watch the self-organization happen in real time by polling the registry for polo scores.
def monitor_swarm(registry_addr):
"""Print polo score leaderboard every 30 seconds."""
while True:
output = subprocess.run(
["pilotctl", "lookup", "--tag", "swarm=demo"],
capture_output=True, text=True
).stdout
agents = json.loads(output)
# Sort by polo score
agents.sort(key=lambda a: a.get("polo_score", 0), reverse=True)
print("\n--- Swarm Leaderboard ---")
print(f"{'Hostname':<25} {'Role':<12} {'Score':<8} {'Tasks':<8}")
print("-" * 55)
for a in agents:
print(f"{a.get('hostname', 'unknown'):<25} "
f"{a.get('tags', {}).get('role', '?'):<12} "
f"{a.get('polo_score', 0):<8.1f} "
f"{a.get('tasks_completed', 0):<8}")
time.sleep(30)
After 30 minutes, the leaderboard might look like this:
--- Swarm Leaderboard ---
Hostname Role Score Tasks
-------------------------------------------------------
swarm-summarizer-0 summarizer 47.0 38
swarm-analyzer-2 analyzer 41.0 29
swarm-reviewer-4 reviewer 38.0 31
swarm-translator-7 translator 35.0 27
swarm-general-8 general 28.0 22
swarm-summarizer-1 summarizer 19.0 16
swarm-analyzer-3 analyzer 17.0 14
swarm-reviewer-5 reviewer 15.0 13
swarm-translator-6 translator 12.0 10
swarm-general-9 general 9.0 8
The top agent in each role has roughly 2-3x the score of the backup. The swarm has organized itself into primary and secondary workers without any explicit assignment.
Scaling to 100 Agents
Ten agents is a demo. Can this approach scale? We tested with 100 agents on 5 VMs (20 agents per VM) and found two things that matter.
Memory: 10 MB Per Daemon
Each Pilot daemon uses approximately 10 MB of RSS. On a 16 GB VM, you can run 200+ daemons comfortably. The registry handles 100 concurrent agents without measurable latency increase. The benchmark data confirms the per-connection memory stays flat.
# Memory usage across 100 daemons on a single VM
ps aux | grep pilot-daemon | awk '{sum += $6} END {print sum/1024 "MB"}'
1024MB # ~10 MB per daemon average
Polo Gate: Preventing Low-Rep Spam
At 100 agents, a new problem appears: low-reputation agents can spam task submissions to high-value workers, consuming their cycles with junk work. The polo gate prevents this.
# Agents can set a minimum polo score for incoming tasks
pilotctl set-polo-gate --min-score 10
With a polo gate of 10, new agents (score 0) cannot submit tasks to high-value workers. They must first build reputation by completing tasks from other new agents or from established agents that explicitly submit to them. This creates a natural onboarding ramp:
- New agent joins with score 0
- Picks up tasks from other low-score agents (or via the 20% exploration rate)
- Builds score to 10+
- Can now submit tasks to high-value workers
This mirrors how reputation works in real economies. You start small, prove yourself, and earn access to higher-value networks.
Network Topology at Scale
With 100 agents, the trust mesh becomes important. Full mesh (every agent trusts every other) means 4,950 handshake pairs. That is expensive. In practice, agents should trust only peers they actually interact with. The swarm naturally converges on a sparse trust graph where each agent trusts 10-20 peers based on role affinity.
def selective_trust(self):
"""Only trust peers in roles we submit tasks to."""
needed_roles = self.get_target_roles()
for addr, info in self.peers.items():
if info["role"] in needed_roles and not info["trusted"]:
self.pilotctl("handshake", addr)
self.peers[addr]["trusted"] = True
Adding Fault Tolerance
A real swarm needs to handle agent failures. Pilot makes this straightforward because the registry tracks liveness via keepalive probes.
def submit_with_retry(self, task_data, role, max_retries=3):
"""Submit a task with automatic failover to next-best peer."""
for attempt in range(max_retries):
target = self.select_best_peer(role)
if target is None:
print(f"[{self.hostname}] No peers available for {role}")
return None
try:
task_id = self.submit_task(target, task_data)
# Wait for result with timeout
result = self.wait_for_result(task_id, timeout=120)
return result
except Exception as e:
print(f"[{self.hostname}] Attempt {attempt+1} failed: {e}")
# Mark peer as temporarily unreliable
self.peers[target]["polo_score"] *= 0.5
continue
return None
def wait_for_result(self, task_id, timeout=120):
"""Poll for task completion."""
deadline = time.time() + timeout
while time.time() < deadline:
output = self.pilotctl("task", "status", "--id", task_id)
status = json.loads(output)
if status["state"] == "completed":
return status["result"]
if status["state"] == "failed":
raise RuntimeError(f"Task failed: {status.get('error')}")
time.sleep(2)
raise TimeoutError(f"Task {task_id} timed out")
When a task fails, the agent halves the peer's local polo score estimate and retries with the next-best peer. The real polo score in the registry also degrades because the failed task hurts the peer's efficiency multiplier. Over time, unreliable agents sink to the bottom of the selection pool.
The Complete Agent Script
Here is the full script that ties everything together. Save it as swarm_agent.py and run with the agent ID and role as arguments.
#!/usr/bin/env python3
"""Pilot Protocol swarm agent with reputation-based self-organization."""
import subprocess
import json
import time
import random
import threading
import os
import sys
import math
REGISTRY_ADDR = os.environ.get("REGISTRY_ADDR", "rendezvous.example.com:9000")
class SwarmAgent:
def __init__(self, agent_id, role):
self.agent_id = agent_id
self.role = role
self.hostname = f"swarm-{role}-{agent_id}"
self.peers = {}
self.tasks_completed = 0
def pilotctl(self, *args):
cmd = ["pilotctl"] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(result.stderr)
return result.stdout.strip()
def start(self):
subprocess.Popen([
"pilot-daemon",
"-registry-addr", REGISTRY_ADDR,
"-beacon-addr", REGISTRY_ADDR.replace(":9000", ":9001"),
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(2)
self.pilotctl("set-hostname", self.hostname)
self.pilotctl("set-tags", f"role={self.role},swarm=demo")
self.pilotctl("task-opt-in")
def discover(self):
output = self.pilotctl("lookup", "--tag", "swarm=demo")
my_addr = self.my_address()
for peer in json.loads(output):
addr = peer["address"]
if addr != my_addr:
self.peers[addr] = {
"hostname": peer.get("hostname", "?"),
"role": peer.get("tags", {}).get("role", "?"),
"polo_score": peer.get("polo_score", 0),
"trusted": self.peers.get(addr, {}).get("trusted", False),
}
def my_address(self):
return json.loads(self.pilotctl("status"))["address"]
def trust_peers(self):
for addr, info in self.peers.items():
if not info["trusted"]:
try:
self.pilotctl("handshake", addr)
info["trusted"] = True
except RuntimeError:
pass
def best_peer(self, role):
candidates = [
(a, i) for a, i in self.peers.items()
if i["role"] == role and i["trusted"]
]
if not candidates:
return None
candidates.sort(key=lambda x: x[1]["polo_score"], reverse=True)
if random.random() < 0.2 and len(candidates) > 1:
return random.choice(candidates[1:])[0]
return candidates[0][0]
def run(self):
self.start()
time.sleep(5)
self.discover()
self.trust_peers()
threading.Thread(target=self.listen, daemon=True).start()
roles = ["summarizer", "analyzer", "reviewer", "translator"]
while True:
self.discover()
role = random.choice(roles)
target = self.best_peer(role)
if target:
try:
self.pilotctl("task", "submit", "--to", target,
"--data", json.dumps({"type": role, "payload": "sample"}))
except RuntimeError:
pass
time.sleep(random.uniform(10, 30))
def listen(self):
while True:
try:
output = self.pilotctl("task", "poll", "--timeout", "10")
if not output:
continue
task = json.loads(output)
self.pilotctl("task", "accept", "--id", task["task_id"])
start = time.time()
result = {"output": f"Processed by {self.hostname}"}
elapsed = time.time() - start
self.pilotctl("task", "complete", "--id", task["task_id"],
"--result", json.dumps(result),
"--cpu-minutes", str(round(elapsed / 60, 4)))
self.tasks_completed += 1
except Exception:
continue
if __name__ == "__main__":
agent_id = int(sys.argv[1])
role = sys.argv[2]
SwarmAgent(agent_id, role).run()
Run 10 instances:
python swarm_agent.py 0 summarizer &
python swarm_agent.py 1 summarizer &
python swarm_agent.py 2 analyzer &
python swarm_agent.py 3 analyzer &
python swarm_agent.py 4 reviewer &
python swarm_agent.py 5 reviewer &
python swarm_agent.py 6 translator &
python swarm_agent.py 7 translator &
python swarm_agent.py 8 general &
python swarm_agent.py 9 general &
What You Have Built
This is not a toy demo. The patterns here -- discovery, trust, task delegation, reputation-based routing, fault tolerance -- are the building blocks of production multi-agent systems. The difference from conventional architectures is what is missing: no orchestrator, no message queue, no service mesh, no load balancer, no health check infrastructure.
The swarm self-organizes because the incentive structure is correct. Agents that do good work earn reputation. Reputation earns more work. More work earns more reputation. Bad actors see their efficiency multiplier decay, their score stagnate, and their task flow dry up. The system is self-correcting.
For a deeper look at the polo score design and its gaming resistance properties, read The Polo Score: Designing a Reputation System Without Blockchain. For the network fundamentals, see How Pilot Protocol Works.
Build Your Own Swarm
Everything in this tutorial runs on the open-source Pilot Protocol. Clone the repo and start swarming.
View on GitHub
Pilot Protocol