Model Context Protocol (MCP): Building Secure AI Tool Interfaces

| 10 min read

The Model Context Protocol (MCP), introduced by Anthropic in late 2024, is rapidly becoming the standard way to give AI assistants access to external tools, data sources, and capabilities. While the A2A protocol I covered previously enables agent-to-agent communication, MCP solves a different but equally important problem: giving a single agent controlled access to your systems and data.

In this guide, I’ll explain what MCP is, why it matters from a security perspective, and walk through building a production-ready MCP server with proper security controls.

What is MCP?

MCP is an open protocol that standardizes how AI assistants connect to external systems. Think of it as a plugin architecture with strong security boundaries:

┌─────────────────────────────────────────────────────────────┐
│                    MCP Host (AI Assistant)                  │
│                  Claude Desktop / IDE / etc.                 │
└──────────────────────────┬──────────────────────────────────┘
                           │ MCP Protocol (JSON-RPC)

           ┌───────────────┼───────────────┐
           │               │               │
           ▼               ▼               ▼
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │  MCP      │    │  MCP      │    │  MCP      │
    │  Server   │    │  Server   │    │  Server   │
    │           │    │           │    │           │
    │ Filesystem│    │ Database  │    │ Custom    │
    │ Access    │    │ Queries   │    │ API Tools │
    └──────────┘    └──────────┘    └──────────┘

Why MCP Matters

Before MCP, integrating AI assistants with external systems was ad-hoc:

  • Security by obscurity - Custom integrations often bypassed security reviews
  • No standard permissions - Each integration handled access control differently
  • Audit trail gaps - Hard to track what AI systems actually accessed
  • Vendor lock-in - Integrations were specific to each AI provider

MCP solves these by providing:

  • Standard permission model - Explicit capabilities requested and granted
  • Consistent API - Same protocol regardless of AI provider
  • Built-in tooling - Resources, prompts, and tools as first-class concepts
  • Audit logging - Clear interface for tracking all interactions

MCP Core Concepts

Resources

Resources represent data the AI can read but not modify:

// Example: Expose a log file as a resource
{
  uri: "file:///var/log/security/app.log",
  name: "Application Security Log",
  mimeType: "text/plain",
  description: "Read-only access to security event logs"
}

Resources are ideal for:

  • Configuration files
  • Log files
  • Database schemas
  • Documentation

Tools

Tools are actions the AI can invoke with parameters:

// Example: A security scanning tool
{
  name: "scan_ip_reputation",
  description: "Check IP address reputation against threat intelligence feeds",
  inputSchema: {
    type: "object",
    properties: {
      ip_address: { 
        type: "string",
        format: "ipv4",
        description: "IP address to investigate"
      },
      include_history: {
        type: "boolean",
        description: "Include historical reputation data"
      }
    },
    required: ["ip_address"]
  }
}

Prompts

Predefined prompt templates for common tasks:

// Example: Incident analysis prompt
{
  name: "analyze_incident",
  description: "Comprehensive incident analysis workflow",
  arguments: [
    {
      name: "incident_id",
      description: "Unique incident identifier",
      required: true
    }
  ]
}

Security Model

The Permission Boundary

MCP implements a capability-based security model. When an MCP server starts, it declares what it can do, and the host must explicitly grant those capabilities.

┌─────────────────────────────────────────────────────────────┐
│                      Permission Request                      │
│  "This server wants to: read ~/projects/**, execute git"    │
└─────────────────────────────────────────────────────────────┘


              ┌────────────────────────┐
              │   User Approval UI     │
              │  [Allow] [Deny] [...]  │
              └────────────────────────┘

              ┌────────────┴────────────┐
              │                         │
              ▼                         ▼
        ┌──────────┐              ┌──────────┐
        │ Approved │              │ Denied   │
        │ Session  │              │ Session  │
        └──────────┘              └──────────┘

This is significantly better than giving an AI unrestricted filesystem or network access.

Security Considerations

Even with permissions, MCP servers require careful design:

1. Principle of Least Privilege

Only expose what’s absolutely necessary:

# BAD: Exposing entire filesystem
@server.list_resources()
async def list_resources():
    return [Resource(uri="file:///", name="Root Filesystem")]

# GOOD: Specific, bounded access
@server.list_resources()
async def list_resources():
    return [
        Resource(
            uri="file:///home/user/safe-project/allowed-dir",
            name="Project Directory"
        )
    ]

2. Input Validation

Never trust AI-generated input:

from pydantic import BaseModel, validator
import ipaddress

class IPScanRequest(BaseModel):
    ip_address: str
    
    @validator('ip_address')
    def validate_ip(cls, v):
        try:
            ipaddress.ip_address(v)
        except ValueError:
            raise ValueError(f"Invalid IP address: {v}")
        
        # Prevent scanning internal networks
        if ipaddress.ip_address(v).is_private:
            raise ValueError("Cannot scan private IP addresses")
        
        return v

3. Rate Limiting

AI agents can hammer APIs unexpectedly:

import asyncio
from collections import defaultdict
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, max_calls: int = 10, window_seconds: int = 60):
        self.max_calls = max_calls
        self.window = timedelta(seconds=window_seconds)
        self.calls = defaultdict(list)
    
    async def acquire(self, key: str) -> bool:
        now = datetime.now()
        
        # Clean old entries
        self.calls[key] = [
            t for t in self.calls[key] 
            if now - t < self.window
        ]
        
        if len(self.calls[key]) >= self.max_calls:
            return False
        
        self.calls[key].append(now)
        return True
    
    async def wait_and_acquire(self, key: str, timeout: int = 30):
        start = asyncio.get_event_loop().time()
        while asyncio.get_event_loop().time() - start < timeout:
            if await self.acquire(key):
                return True
            await asyncio.sleep(1)
        raise TimeoutError(f"Rate limit exceeded for {key}")

4. Audit Logging

Log every action for forensic purposes:

import json
import logging
from datetime import datetime
from typing import Any

class SecurityAuditLogger:
    def __init__(self, log_file: str = "mcp_audit.log"):
        self.logger = logging.getLogger("mcp_audit")
        handler = logging.FileHandler(log_file)
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_tool_call(
        self, 
        tool_name: str, 
        arguments: dict, 
        result: Any,
        success: bool,
        user: str = "unknown"
    ):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "event": "tool_call",
            "tool": tool_name,
            "arguments": self._sanitize(arguments),
            "result_preview": self._sanitize(str(result)[:500]),
            "success": success,
            "user": user
        }
        self.logger.info(json.dumps(entry))
    
    def _sanitize(self, obj: Any) -> Any:
        """Remove sensitive fields from logs."""
        SENSITIVE_FIELDS = {"password", "token", "secret", "api_key", "credential"}
        
        if isinstance(obj, dict):
            return {
                k: "[REDACTED]" if k.lower() in SENSITIVE_FIELDS else v
                for k, v in obj.items()
            }
        return obj

Building a Security MCP Server

Let’s build a complete MCP server for security operations with proper controls:

# security_mcp_server.py
import asyncio
import json
import subprocess
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, validator
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, Resource, TextContent

# Configuration
ALLOWED_LOG_PATHS = [
    "/var/log/auth.log",
    "/var/log/syslog",
    "/var/log/nginx/access.log"
]

MAX_LOG_SIZE = 10 * 1024 * 1024  # 10MB
RATE_LIMIT_CALLS = 20
RATE_LIMIT_WINDOW = 60  # seconds

# Validated request models
class LogQuery(BaseModel):
    log_path: str
    pattern: str
    max_lines: int = 100
    
    @validator('log_path')
    def validate_path(cls, v):
        if v not in ALLOWED_LOG_PATHS:
            raise ValueError(f"Log path not in allowed list: {v}")
        return v
    
    @validator('max_lines')
    def validate_lines(cls, v):
        if v > 1000:
            raise ValueError("max_lines cannot exceed 1000")
        return v

class IPQuery(BaseModel):
    ip_address: str
    
    @validator('ip_address')
    def validate_ip(cls, v):
        import ipaddress
        try:
            addr = ipaddress.ip_address(v)
            # Prevent internal scanning
            if addr.is_private:
                raise ValueError("Cannot query private IP addresses")
            return v
        except ValueError:
            raise ValueError(f"Invalid IP address: {v}")

# Initialize server
app = Server("security-tools")
audit_logger = SecurityAuditLogger()
rate_limiter = RateLimiter(RATE_LIMIT_CALLS, RATE_LIMIT_WINDOW)

# Declare capabilities
@app.list_tools()
async def list_tools():
    return [
        Tool(
            name="query_logs",
            description="Search log files for patterns",
            inputSchema={
                "type": "object",
                "properties": {
                    "log_path": {
                        "type": "string",
                        "description": "Path to log file",
                        "enum": ALLOWED_LOG_PATHS
                    },
                    "pattern": {
                        "type": "string",
                        "description": "Regex pattern to search for"
                    },
                    "max_lines": {
                        "type": "integer",
                        "description": "Maximum lines to return",
                        "default": 100
                    }
                },
                "required": ["log_path", "pattern"]
            }
        ),
        Tool(
            name="check_ip_reputation",
            description="Check IP reputation against threat intel",
            inputSchema={
                "type": "object",
                "properties": {
                    "ip_address": {
                        "type": "string",
                        "description": "IP address to check"
                    }
                },
                "required": ["ip_address"]
            }
        )
    ]

@app.list_resources()
async def list_resources():
    return [
        Resource(
            uri=f"file://{path}",
            name=Path(path).name,
            mimeType="text/plain"
        )
        for path in ALLOWED_LOG_PATHS
    ]

# Implement tools
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    try:
        # Wait for rate limit
        await rate_limiter.wait_and_acquire(name)
        
        if name == "query_logs":
            return await _query_logs(arguments)
        elif name == "check_ip_reputation":
            return await _check_ip_reputation(arguments)
        else:
            raise ValueError(f"Unknown tool: {name}")
    except Exception as e:
        audit_logger.log_tool_call(name, arguments, str(e), False)
        return [TextContent(type="text", text=f"Error: {str(e)}")]

async def _query_logs(args: dict) -> list[TextContent]:
    # Validate
    query = LogQuery(**args)
    
    # Execute safely
    try:
        # Use grep with timeout
        proc = await asyncio.create_subprocess_exec(
            "grep", "-E", query.pattern, query.log_path,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        try:
            stdout, stderr = await asyncio.wait_for(
                proc.communicate(), 
                timeout=10
            )
            output = stdout.decode()[:MAX_LOG_SIZE]
        except asyncio.TimeoutError:
            proc.kill()
            raise RuntimeError("Log query timed out")
        
        lines = output.split("\n")[:query.max_lines]
        result = "\n".join(lines)
        
        audit_logger.log_tool_call(
            "query_logs", args, 
            f"{len(lines)} lines returned", 
            True
        )
        
        return [TextContent(type="text", text=result)]
    except Exception as e:
        audit_logger.log_tool_call("query_logs", args, str(e), False)
        raise

async def _check_ip_reputation(args: dict) -> list[TextContent]:
    query = IPQuery(**args)
    
    # Call threat intel API (simulated)
    result = await _query_threat_intel(query.ip_address)
    
    audit_logger.log_tool_call(
        "check_ip_reputation", 
        args, 
        result, 
        True
    )
    
    return [TextContent(type="text", text=json.dumps(result, indent=2))]

async def _query_threat_intel(ip: str) -> dict:
    # Placeholder - integrate with your threat intel API
    return {
        "ip": ip,
        "reputation": "unknown",
        "categories": [],
        "last_seen": None,
        "source": "local_database"
    }

# Run server
async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream)

if __name__ == "__main__":
    asyncio.run(main())

Running Your MCP Server

With Claude Desktop

Add to your Claude Desktop configuration:

// ~/Library/Application Support/Claude/claude_desktop_config.json (macOS)
{
  "mcpServers": {
    "security-tools": {
      "command": "python",
      "args": ["/path/to/security_mcp_server.py"],
      "env": {
        "LOG_LEVEL": "INFO"
      }
    }
  }
}

With Other MCP Hosts

Most MCP hosts support similar configuration. The key fields:

  • command: Executable to run the server
  • args: Command-line arguments
  • env: Environment variables
  • disabled: Set to true to temporarily disable

Security Checklist

Before deploying an MCP server to production:

  • Path traversal protection - Validate all file paths against allowlists
  • Input validation - Use Pydantic or similar for all inputs
  • Rate limiting - Prevent DoS from runaway AI agents
  • Audit logging - Log all tool calls with sanitized arguments
  • Timeout handling - All external calls must have timeouts
  • Credential isolation - Never log or return secrets
  • Minimal permissions - Only request necessary capabilities
  • Error handling - Don’t leak internal details in errors
  • Resource limits - Cap response sizes and execution time
  • Testing - Security-focused integration tests

MCP vs Direct API Integration

AspectMCPDirect API
Standard protocol
Permission model✅ Built-in❌ Custom
Audit logging✅ Standard interface⚠️ Manual
AI provider agnostic
Implementation overhead⚠️ More setup✅ Less code
Security by default⚠️ Manual

Conclusion

MCP represents a significant step forward in safely integrating AI assistants with external systems. By standardizing the interface between AI and tools, it provides a consistent security boundary that’s easier to audit and control.

The key security principles remain unchanged: least privilege, input validation, rate limiting, and comprehensive logging. MCP simply provides a standard framework for implementing these consistently across all AI integrations.

For security-conscious organizations, MCP servers should be:

  1. Treated as privileged access points
  2. Reviewed with the same rigor as API endpoints
  3. Monitored for anomalous usage patterns
  4. Isolated in separate security contexts when possible

The standard is evolving rapidly, so stay updated on the official MCP specification for the latest security features and best practices.


~Amit

References