Building a Multi-Agent System with A2A Protocol and Docker

| 15 min read

The Agent2Agent (A2A) protocol, developed by Google and now under the Linux Foundation, is an open standard that enables AI agents built on different frameworks to communicate and collaborate. Unlike MCP (Model Context Protocol) which focuses on providing tools to a single agent, A2A enables true agent-to-agent communication where agents can delegate tasks, share context, and work together as peers.

In this guide, we’ll build a local multi-agent system with three specialized agents that collaborate to accomplish a cybersecurity analysis task - all running in Docker containers on macOS.

What We’re Building

We’ll create a system with three specialized agents:

  1. Orchestrator Agent - Coordinates tasks and routes requests to appropriate specialist agents
  2. Threat Intel Agent - Analyzes IP addresses and domains for potential threats
  3. Log Analysis Agent - Parses and analyzes security logs for suspicious patterns

These agents will communicate using the A2A protocol, demonstrating how different specialized systems can work together seamlessly.

┌─────────────────────────────────────────────────────────────┐
│                      User / Client                          │
└─────────────────────┬───────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│              Orchestrator Agent (Port 8000)                 │
│         Routes requests to specialist agents                │
└─────────┬───────────────────────────────────┬───────────────┘
          │                                   │
          ▼                                   ▼
┌─────────────────────────┐   ┌─────────────────────────────┐
│  Threat Intel Agent     │   │   Log Analysis Agent        │
│     (Port 8001)         │   │       (Port 8002)           │
│  - IP reputation        │   │  - Parse log files          │
│  - Domain analysis      │   │  - Detect anomalies         │
│  - IOC lookup           │   │  - Pattern matching         │
└─────────────────────────┘   └─────────────────────────────┘

Prerequisites

Before we begin, ensure you have the following installed on your macOS system:

  • Docker Desktop for Mac - Download here
  • Python 3.11+ - For running the A2A inspector tool
  • Git - For cloning the sample repository
# Verify installations
docker --version
python3 --version
git --version

Project Structure

Create the following directory structure for our multi-agent system:

a2a-security-agents/
├── docker-compose.yml
├── shared/
│   └── requirements.txt
├── orchestrator/
│   ├── Dockerfile
│   └── agent.py
├── threat-intel/
│   ├── Dockerfile
│   └── agent.py
├── log-analysis/
│   ├── Dockerfile
│   └── agent.py
└── client/
    └── test_client.py

Step 1: Create the Shared Requirements

First, let’s create the shared Python dependencies that all agents will use:

mkdir -p a2a-security-agents/shared
cd a2a-security-agents

Create shared/requirements.txt:

a2a-sdk>=0.2.0
uvicorn>=0.30.0
httpx>=0.27.0

Step 2: Create the Threat Intel Agent

This agent specializes in analyzing IP addresses and domains for potential threats.

Create threat-intel/agent.py:

"""Threat Intelligence Agent - Analyzes IPs and domains for threats."""

import uuid
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.types import (
    AgentCard,
    AgentCapabilities,
    AgentSkill,
    Part,
    TextPart,
    Message,
    Task,
    TaskState,
    TaskStatus,
)


# Simulated threat intelligence database
THREAT_DB = {
    "192.168.1.100": {"risk": "low", "category": "internal", "notes": "Private network"},
    "45.33.32.156": {"risk": "high", "category": "scanner", "notes": "Known Nmap scanhost"},
    "185.220.101.1": {"risk": "critical", "category": "tor_exit", "notes": "Tor exit node"},
    "8.8.8.8": {"risk": "low", "category": "dns", "notes": "Google DNS"},
    "malware.example.com": {"risk": "critical", "category": "c2", "notes": "Known C2 domain"},
    "suspicious.site": {"risk": "high", "category": "phishing", "notes": "Reported phishing"},
}


class ThreatIntelExecutor(AgentExecutor):
    """Executes threat intelligence analysis tasks."""

    async def execute(
        self, context: RequestContext, request_message: Message
    ) -> None:
        """Analyze the provided IOC (IP or domain) for threats."""
        # Extract the query from the message
        query = ""
        for part in request_message.parts:
            if isinstance(part.root, TextPart):
                query = part.root.text
                break

        # Parse the IOC from the query
        ioc = self._extract_ioc(query)
        
        # Look up threat intelligence
        if ioc in THREAT_DB:
            intel = THREAT_DB[ioc]
            response = (
                f"## Threat Intelligence Report for `{ioc}`\n\n"
                f"| Field | Value |\n"
                f"|-------|-------|\n"
                f"| **Risk Level** | {intel['risk'].upper()} |\n"
                f"| **Category** | {intel['category']} |\n"
                f"| **Notes** | {intel['notes']} |\n\n"
                f"**Recommendation**: "
            )
            if intel['risk'] == 'critical':
                response += "BLOCK immediately and investigate all connections."
            elif intel['risk'] == 'high':
                response += "Monitor closely and consider blocking."
            else:
                response += "No immediate action required."
        else:
            response = (
                f"## Threat Intelligence Report for `{ioc}`\n\n"
                f"**Status**: No threat data found in database.\n\n"
                f"**Risk Level**: UNKNOWN\n\n"
                f"**Recommendation**: Perform additional OSINT analysis."
            )

        # Send the response
        await context.send_status_update(
            TaskStatus(state=TaskState.working, message=Message(
                role="agent",
                parts=[Part(root=TextPart(text="Analyzing threat intelligence..."))]
            ))
        )

        await context.send_status_update(
            TaskStatus(state=TaskState.completed, message=Message(
                role="agent",
                parts=[Part(root=TextPart(text=response))]
            ))
        )

    def _extract_ioc(self, query: str) -> str:
        """Extract IOC from query string."""
        # Simple extraction - in production, use regex
        words = query.split()
        for word in words:
            word = word.strip('`"\'')
            if '.' in word:  # IP or domain
                return word
        return query.strip()


# Define the agent card
agent_card = AgentCard(
    name="Threat Intelligence Agent",
    description="Analyzes IP addresses and domains for known threats and provides risk assessments.",
    url="http://threat-intel:8001",
    version="1.0.0",
    capabilities=AgentCapabilities(
        streaming=True,
        pushNotifications=False,
    ),
    skills=[
        AgentSkill(
            id="threat-lookup",
            name="Threat Lookup",
            description="Look up an IP address or domain in threat intelligence databases",
            tags=["security", "threat-intel", "ioc"],
        ),
        AgentSkill(
            id="risk-assessment",
            name="Risk Assessment",
            description="Provide risk assessment for network indicators",
            tags=["security", "risk", "analysis"],
        ),
    ],
)

# Create the application
handler = DefaultRequestHandler(
    agent_executor=ThreatIntelExecutor(),
    agent_card=agent_card,
)

app = A2AStarletteApplication(agent_card=agent_card, http_handler=handler)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app.build(), host="0.0.0.0", port=8001)

Create threat-intel/Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY shared/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

COPY threat-intel/agent.py /app/agent.py

EXPOSE 8001

CMD ["python", "agent.py"]

Step 3: Create the Log Analysis Agent

This agent specializes in parsing and analyzing security logs.

Create log-analysis/agent.py:

"""Log Analysis Agent - Parses and analyzes security logs."""

import re
from datetime import datetime
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.types import (
    AgentCard,
    AgentCapabilities,
    AgentSkill,
    Part,
    TextPart,
    Message,
    TaskState,
    TaskStatus,
)


class LogAnalysisExecutor(AgentExecutor):
    """Executes log analysis tasks."""

    # Patterns for detecting suspicious activity
    SUSPICIOUS_PATTERNS = {
        "brute_force": r"Failed password.*from (\d+\.\d+\.\d+\.\d+)",
        "sql_injection": r"(SELECT|UNION|DROP|INSERT|DELETE).*FROM",
        "path_traversal": r"\.\./|\.\./\.\./",
        "xss_attempt": r"<script>|javascript:|onerror=",
        "shell_command": r"(;|&&|\|)\s*(cat|ls|rm|wget|curl)",
    }

    async def execute(
        self, context: RequestContext, request_message: Message
    ) -> None:
        """Analyze the provided log content for suspicious patterns."""
        # Extract the log content from the message
        log_content = ""
        for part in request_message.parts:
            if isinstance(part.root, TextPart):
                log_content = part.root.text
                break

        await context.send_status_update(
            TaskStatus(state=TaskState.working, message=Message(
                role="agent",
                parts=[Part(root=TextPart(text="Analyzing log content for suspicious patterns..."))]
            ))
        )

        # Analyze the logs
        findings = self._analyze_logs(log_content)
        
        # Format the response
        if findings:
            response = "## Log Analysis Report\n\n"
            response += f"**Analysis Time**: {datetime.now().isoformat()}\n\n"
            response += "### Findings\n\n"
            response += "| Severity | Pattern Type | Details | Line |\n"
            response += "|----------|--------------|---------|------|\n"
            
            for finding in findings:
                response += f"| {finding['severity']} | {finding['type']} | {finding['detail'][:50]}... | {finding['line']} |\n"
            
            response += f"\n**Total Findings**: {len(findings)}\n\n"
            
            # Add recommendations
            response += "### Recommendations\n\n"
            if any(f['severity'] == 'CRITICAL' for f in findings):
                response += "- **IMMEDIATE ACTION**: Critical security events detected. Investigate immediately.\n"
            if any(f['type'] == 'brute_force' for f in findings):
                response += "- Consider implementing fail2ban or rate limiting.\n"
            if any(f['type'] in ['sql_injection', 'xss_attempt'] for f in findings):
                response += "- Review input validation in web applications.\n"
        else:
            response = (
                "## Log Analysis Report\n\n"
                f"**Analysis Time**: {datetime.now().isoformat()}\n\n"
                "**Status**: No suspicious patterns detected in the provided logs.\n\n"
                "The logs appear to show normal activity."
            )

        await context.send_status_update(
            TaskStatus(state=TaskState.completed, message=Message(
                role="agent",
                parts=[Part(root=TextPart(text=response))]
            ))
        )

    def _analyze_logs(self, log_content: str) -> list:
        """Analyze log content for suspicious patterns."""
        findings = []
        lines = log_content.split('\n')
        
        for line_num, line in enumerate(lines, 1):
            for pattern_type, pattern in self.SUSPICIOUS_PATTERNS.items():
                if re.search(pattern, line, re.IGNORECASE):
                    severity = self._get_severity(pattern_type)
                    findings.append({
                        'type': pattern_type,
                        'severity': severity,
                        'detail': line.strip(),
                        'line': line_num,
                    })
        
        return findings

    def _get_severity(self, pattern_type: str) -> str:
        """Get severity level for pattern type."""
        critical_patterns = {'sql_injection', 'shell_command'}
        high_patterns = {'xss_attempt', 'path_traversal'}
        
        if pattern_type in critical_patterns:
            return 'CRITICAL'
        elif pattern_type in high_patterns:
            return 'HIGH'
        else:
            return 'MEDIUM'


# Define the agent card
agent_card = AgentCard(
    name="Log Analysis Agent",
    description="Parses and analyzes security logs to detect suspicious patterns and potential threats.",
    url="http://log-analysis:8002",
    version="1.0.0",
    capabilities=AgentCapabilities(
        streaming=True,
        pushNotifications=False,
    ),
    skills=[
        AgentSkill(
            id="log-parsing",
            name="Log Parsing",
            description="Parse various log formats including syslog, Apache, and application logs",
            tags=["security", "logs", "parsing"],
        ),
        AgentSkill(
            id="anomaly-detection",
            name="Anomaly Detection",
            description="Detect suspicious patterns and anomalies in log data",
            tags=["security", "anomaly", "detection"],
        ),
    ],
)

# Create the application
handler = DefaultRequestHandler(
    agent_executor=LogAnalysisExecutor(),
    agent_card=agent_card,
)

app = A2AStarletteApplication(agent_card=agent_card, http_handler=handler)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app.build(), host="0.0.0.0", port=8002)

Create log-analysis/Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY shared/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

COPY log-analysis/agent.py /app/agent.py

EXPOSE 8002

CMD ["python", "agent.py"]

Step 4: Create the Orchestrator Agent

The orchestrator coordinates between the specialist agents and routes tasks appropriately.

Create orchestrator/agent.py:

"""Orchestrator Agent - Coordinates tasks between specialist agents."""

import httpx
import json
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.types import (
    AgentCard,
    AgentCapabilities,
    AgentSkill,
    Part,
    TextPart,
    Message,
    TaskState,
    TaskStatus,
)


# Agent registry - in production, use proper service discovery
AGENT_REGISTRY = {
    "threat-intel": {
        "url": "http://threat-intel:8001",
        "keywords": ["ip", "domain", "threat", "ioc", "reputation", "malicious"],
    },
    "log-analysis": {
        "url": "http://log-analysis:8002",
        "keywords": ["log", "analyze", "parse", "suspicious", "pattern", "audit"],
    },
}


class OrchestratorExecutor(AgentExecutor):
    """Orchestrates tasks between specialist agents."""

    async def execute(
        self, context: RequestContext, request_message: Message
    ) -> None:
        """Route the request to appropriate specialist agent(s)."""
        # Extract the query from the message
        query = ""
        for part in request_message.parts:
            if isinstance(part.root, TextPart):
                query = part.root.text
                break

        await context.send_status_update(
            TaskStatus(state=TaskState.working, message=Message(
                role="agent",
                parts=[Part(root=TextPart(text="Analyzing request and routing to specialist agents..."))]
            ))
        )

        # Determine which agent(s) to route to
        target_agents = self._route_request(query.lower())
        
        if not target_agents:
            await context.send_status_update(
                TaskStatus(state=TaskState.completed, message=Message(
                    role="agent",
                    parts=[Part(root=TextPart(text=(
                        "## Request Analysis\n\n"
                        "I couldn't determine which specialist agent should handle this request.\n\n"
                        "**Available specialists:**\n"
                        "- **Threat Intel Agent**: Analyze IP addresses and domains\n"
                        "- **Log Analysis Agent**: Parse and analyze security logs\n\n"
                        "Please rephrase your request with more specific keywords."
                    )))]
                ))
            )
            return

        # Collect responses from all relevant agents
        responses = []
        
        for agent_name in target_agents:
            agent_info = AGENT_REGISTRY[agent_name]
            
            await context.send_status_update(
                TaskStatus(state=TaskState.working, message=Message(
                    role="agent",
                    parts=[Part(root=TextPart(text=f"Delegating to {agent_name} agent..."))]
                ))
            )
            
            try:
                response = await self._call_agent(agent_info["url"], query)
                responses.append({
                    "agent": agent_name,
                    "response": response,
                    "success": True,
                })
            except Exception as e:
                responses.append({
                    "agent": agent_name,
                    "response": str(e),
                    "success": False,
                })

        # Compile final response
        final_response = self._compile_response(query, responses)
        
        await context.send_status_update(
            TaskStatus(state=TaskState.completed, message=Message(
                role="agent",
                parts=[Part(root=TextPart(text=final_response))]
            ))
        )

    def _route_request(self, query: str) -> list:
        """Determine which agents should handle the request."""
        target_agents = []
        
        for agent_name, agent_info in AGENT_REGISTRY.items():
            if any(keyword in query for keyword in agent_info["keywords"]):
                target_agents.append(agent_name)
        
        return target_agents

    async def _call_agent(self, agent_url: str, query: str) -> str:
        """Call a specialist agent via A2A protocol."""
        async with httpx.AsyncClient(timeout=30.0) as client:
            # Send message to the agent
            response = await client.post(
                f"{agent_url}/",
                json={
                    "jsonrpc": "2.0",
                    "method": "message/send",
                    "params": {
                        "message": {
                            "role": "user",
                            "parts": [{"text": query}],
                        }
                    },
                    "id": "1",
                },
                headers={"Content-Type": "application/json"},
            )
            
            result = response.json()
            
            # Extract the response message
            if "result" in result:
                task_or_message = result["result"]
                if "status" in task_or_message and "message" in task_or_message["status"]:
                    message = task_or_message["status"]["message"]
                    if message and "parts" in message:
                        for part in message["parts"]:
                            if "text" in part:
                                return part["text"]
                elif "parts" in task_or_message:
                    for part in task_or_message["parts"]:
                        if "text" in part:
                            return part["text"]
            
            return "No response received from agent."

    def _compile_response(self, query: str, responses: list) -> str:
        """Compile responses from multiple agents into a unified report."""
        report = "# Security Analysis Report\n\n"
        report += f"**Original Query**: {query}\n\n"
        report += "---\n\n"
        
        for resp in responses:
            agent_name = resp["agent"].replace("-", " ").title()
            report += f"## {agent_name} Results\n\n"
            
            if resp["success"]:
                report += resp["response"] + "\n\n"
            else:
                report += f"**Error**: Failed to get response - {resp['response']}\n\n"
            
            report += "---\n\n"
        
        report += "*Report generated by Security Orchestrator Agent*"
        return report


# Define the agent card
agent_card = AgentCard(
    name="Security Orchestrator Agent",
    description="Coordinates security analysis tasks between specialist agents for comprehensive threat assessment.",
    url="http://localhost:8000",
    version="1.0.0",
    capabilities=AgentCapabilities(
        streaming=True,
        pushNotifications=False,
    ),
    skills=[
        AgentSkill(
            id="orchestration",
            name="Task Orchestration",
            description="Route security analysis requests to appropriate specialist agents",
            tags=["security", "orchestration", "coordination"],
        ),
        AgentSkill(
            id="comprehensive-analysis",
            name="Comprehensive Analysis",
            description="Combine results from multiple specialist agents for thorough security assessment",
            tags=["security", "analysis", "comprehensive"],
        ),
    ],
)

# Create the application
handler = DefaultRequestHandler(
    agent_executor=OrchestratorExecutor(),
    agent_card=agent_card,
)

app = A2AStarletteApplication(agent_card=agent_card, http_handler=handler)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app.build(), host="0.0.0.0", port=8000)

Create orchestrator/Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY shared/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

COPY orchestrator/agent.py /app/agent.py

EXPOSE 8000

CMD ["python", "agent.py"]

Step 5: Create Docker Compose Configuration

Create docker-compose.yml to orchestrate all three agents:

version: '3.8'

services:
  orchestrator:
    build:
      context: .
      dockerfile: orchestrator/Dockerfile
    ports:
      - "8000:8000"
    networks:
      - a2a-network
    depends_on:
      - threat-intel
      - log-analysis
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/.well-known/agent.json"]
      interval: 10s
      timeout: 5s
      retries: 3

  threat-intel:
    build:
      context: .
      dockerfile: threat-intel/Dockerfile
    ports:
      - "8001:8001"
    networks:
      - a2a-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8001/.well-known/agent.json"]
      interval: 10s
      timeout: 5s
      retries: 3

  log-analysis:
    build:
      context: .
      dockerfile: log-analysis/Dockerfile
    ports:
      - "8002:8002"
    networks:
      - a2a-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8002/.well-known/agent.json"]
      interval: 10s
      timeout: 5s
      retries: 3

networks:
  a2a-network:
    driver: bridge

Step 6: Build and Run the System

Now let’s build and start all the agents:

# Build all containers
docker-compose build

# Start all agents
docker-compose up -d

# Check status
docker-compose ps

# View logs
docker-compose logs -f

You should see all three agents running:

NAME                    STATUS    PORTS
orchestrator            running   0.0.0.0:8000->8000/tcp
threat-intel            running   0.0.0.0:8001->8001/tcp
log-analysis            running   0.0.0.0:8002->8002/tcp

Step 7: Verify Agent Cards

Each A2A agent publishes an Agent Card at the well-known endpoint. Let’s verify:

# Check orchestrator agent card
curl http://localhost:8000/.well-known/agent.json | jq

# Check threat intel agent card
curl http://localhost:8001/.well-known/agent.json | jq

# Check log analysis agent card
curl http://localhost:8002/.well-known/agent.json | jq

Step 8: Test the Multi-Agent System

Create client/test_client.py for testing:

"""Test client for A2A multi-agent system."""

import httpx
import json


def send_message(url: str, message: str) -> dict:
    """Send a message to an A2A agent."""
    with httpx.Client(timeout=30.0) as client:
        response = client.post(
            f"{url}/",
            json={
                "jsonrpc": "2.0",
                "method": "message/send",
                "params": {
                    "message": {
                        "role": "user",
                        "parts": [{"text": message}],
                    }
                },
                "id": "1",
            },
            headers={"Content-Type": "application/json"},
        )
        return response.json()


def extract_response(result: dict) -> str:
    """Extract text response from A2A result."""
    if "result" in result:
        task = result["result"]
        if "status" in task and "message" in task["status"]:
            message = task["status"]["message"]
            if message and "parts" in message:
                for part in message["parts"]:
                    if "text" in part:
                        return part["text"]
    return str(result)


def main():
    print("=" * 60)
    print("A2A Multi-Agent Security System Test")
    print("=" * 60)

    # Test 1: Direct threat intel query
    print("\n[Test 1] Direct query to Threat Intel Agent")
    print("-" * 40)
    result = send_message(
        "http://localhost:8001",
        "Analyze the IP address 185.220.101.1 for threats"
    )
    print(extract_response(result))

    # Test 2: Direct log analysis query
    print("\n[Test 2] Direct query to Log Analysis Agent")
    print("-" * 40)
    log_sample = """
    Jan 15 10:23:45 server sshd: Failed password for admin from 192.168.1.50 port 22
    Jan 15 10:23:46 server sshd: Failed password for admin from 192.168.1.50 port 22
    Jan 15 10:23:47 server sshd: Failed password for admin from 192.168.1.50 port 22
    Jan 15 10:24:01 server webapp: SELECT * FROM users WHERE id=1 UNION SELECT * FROM passwords
    """
    result = send_message(
        "http://localhost:8002",
        f"Analyze these logs for suspicious activity:\n{log_sample}"
    )
    print(extract_response(result))

    # Test 3: Orchestrated query (routes to threat intel)
    print("\n[Test 3] Orchestrated query (threat intel routing)")
    print("-" * 40)
    result = send_message(
        "http://localhost:8000",
        "Check if the domain malware.example.com is malicious"
    )
    print(extract_response(result))

    # Test 4: Orchestrated query (routes to log analysis)
    print("\n[Test 4] Orchestrated query (log analysis routing)")
    print("-" * 40)
    result = send_message(
        "http://localhost:8000",
        f"Parse and analyze this suspicious log entry:\nGET /admin?id=1;DROP TABLE users-- HTTP/1.1"
    )
    print(extract_response(result))

    print("\n" + "=" * 60)
    print("Tests completed!")
    print("=" * 60)


if __name__ == "__main__":
    main()

Run the test:

# Install httpx for the test client
pip install httpx

# Run tests
python client/test_client.py

Step 9: Using the A2A Inspector

The A2A project provides an inspector tool for debugging and testing agents interactively:

# Clone the inspector
git clone https://github.com/a2aproject/a2a-inspector.git
cd a2a-inspector

# Install dependencies
npm install

# Start the inspector
npm run dev

Open http://localhost:5173 in your browser and connect to your agents:

  1. Enter http://localhost:8000 for the orchestrator
  2. Click “Connect” to fetch the agent card
  3. Send test messages and observe the responses

Monitoring and Debugging

View Real-time Logs

# All agents
docker-compose logs -f

# Specific agent
docker-compose logs -f orchestrator
docker-compose logs -f threat-intel
docker-compose logs -f log-analysis

Check Network Communication

# Inspect the Docker network
docker network inspect a2a-security-agents_a2a-network

# Test connectivity between containers
docker-compose exec orchestrator curl http://threat-intel:8001/.well-known/agent.json

Resource Usage

# Monitor resource usage
docker stats

Extending the System

Adding a New Agent

To add a new specialist agent (e.g., a Vulnerability Scanner):

  1. Create a new directory: vuln-scanner/
  2. Implement the agent following the same pattern
  3. Add to docker-compose.yml
  4. Update the orchestrator’s AGENT_REGISTRY

Production Considerations

For production deployments, consider:

  • Authentication: Implement OAuth2 or API key authentication
  • Service Discovery: Use Consul, etcd, or Kubernetes service discovery
  • Observability: Add OpenTelemetry tracing and Prometheus metrics
  • Rate Limiting: Implement rate limiting to prevent abuse
  • TLS: Enable HTTPS for all agent communication

Cleanup

To stop and remove all containers:

# Stop all agents
docker-compose down

# Remove images
docker-compose down --rmi all

# Remove volumes
docker-compose down -v

Conclusion

We’ve built a functional multi-agent system using the A2A protocol with three specialized agents that communicate and collaborate on cybersecurity analysis tasks. This architecture demonstrates key A2A concepts:

  • Agent Discovery: Each agent publishes its capabilities via Agent Cards
  • Task Delegation: The orchestrator routes tasks to appropriate specialists
  • Opaque Execution: Agents collaborate without exposing internal implementation
  • Standardized Communication: All agents use the same A2A protocol

The A2A protocol opens up possibilities for building modular, scalable AI systems where specialized agents can be developed, deployed, and upgraded independently while maintaining interoperability.

References