Model Context Protocol (MCP): Building Secure AI Tool Interfaces
The Model Context Protocol (MCP), introduced by Anthropic in late 2024, is rapidly becoming the standard way to give AI assistants access to external tools, data sources, and capabilities. While the A2A protocol I covered previously enables agent-to-agent communication, MCP solves a different but equally important problem: giving a single agent controlled access to your systems and data.
In this guide, I’ll explain what MCP is, why it matters from a security perspective, and walk through building a production-ready MCP server with proper security controls.
What is MCP?
MCP is an open protocol that standardizes how AI assistants connect to external systems. Think of it as a plugin architecture with strong security boundaries:
┌─────────────────────────────────────────────────────────────┐
│ MCP Host (AI Assistant) │
│ Claude Desktop / IDE / etc. │
└──────────────────────────┬──────────────────────────────────┘
│ MCP Protocol (JSON-RPC)
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ MCP │ │ MCP │ │ MCP │
│ Server │ │ Server │ │ Server │
│ │ │ │ │ │
│ Filesystem│ │ Database │ │ Custom │
│ Access │ │ Queries │ │ API Tools │
└──────────┘ └──────────┘ └──────────┘
Why MCP Matters
Before MCP, integrating AI assistants with external systems was ad-hoc:
- Security by obscurity - Custom integrations often bypassed security reviews
- No standard permissions - Each integration handled access control differently
- Audit trail gaps - Hard to track what AI systems actually accessed
- Vendor lock-in - Integrations were specific to each AI provider
MCP solves these by providing:
- Standard permission model - Explicit capabilities requested and granted
- Consistent API - Same protocol regardless of AI provider
- Built-in tooling - Resources, prompts, and tools as first-class concepts
- Audit logging - Clear interface for tracking all interactions
MCP Core Concepts
Resources
Resources represent data the AI can read but not modify:
// Example: Expose a log file as a resource
{
uri: "file:///var/log/security/app.log",
name: "Application Security Log",
mimeType: "text/plain",
description: "Read-only access to security event logs"
}
Resources are ideal for:
- Configuration files
- Log files
- Database schemas
- Documentation
Tools
Tools are actions the AI can invoke with parameters:
// Example: A security scanning tool
{
name: "scan_ip_reputation",
description: "Check IP address reputation against threat intelligence feeds",
inputSchema: {
type: "object",
properties: {
ip_address: {
type: "string",
format: "ipv4",
description: "IP address to investigate"
},
include_history: {
type: "boolean",
description: "Include historical reputation data"
}
},
required: ["ip_address"]
}
}
Prompts
Predefined prompt templates for common tasks:
// Example: Incident analysis prompt
{
name: "analyze_incident",
description: "Comprehensive incident analysis workflow",
arguments: [
{
name: "incident_id",
description: "Unique incident identifier",
required: true
}
]
}
Security Model
The Permission Boundary
MCP implements a capability-based security model. When an MCP server starts, it declares what it can do, and the host must explicitly grant those capabilities.
┌─────────────────────────────────────────────────────────────┐
│ Permission Request │
│ "This server wants to: read ~/projects/**, execute git" │
└─────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────┐
│ User Approval UI │
│ [Allow] [Deny] [...] │
└────────────────────────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Approved │ │ Denied │
│ Session │ │ Session │
└──────────┘ └──────────┘
This is significantly better than giving an AI unrestricted filesystem or network access.
Security Considerations
Even with permissions, MCP servers require careful design:
1. Principle of Least Privilege
Only expose what’s absolutely necessary:
# BAD: Exposing entire filesystem
@server.list_resources()
async def list_resources():
return [Resource(uri="file:///", name="Root Filesystem")]
# GOOD: Specific, bounded access
@server.list_resources()
async def list_resources():
return [
Resource(
uri="file:///home/user/safe-project/allowed-dir",
name="Project Directory"
)
]
2. Input Validation
Never trust AI-generated input:
from pydantic import BaseModel, validator
import ipaddress
class IPScanRequest(BaseModel):
ip_address: str
@validator('ip_address')
def validate_ip(cls, v):
try:
ipaddress.ip_address(v)
except ValueError:
raise ValueError(f"Invalid IP address: {v}")
# Prevent scanning internal networks
if ipaddress.ip_address(v).is_private:
raise ValueError("Cannot scan private IP addresses")
return v
3. Rate Limiting
AI agents can hammer APIs unexpectedly:
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_calls: int = 10, window_seconds: int = 60):
self.max_calls = max_calls
self.window = timedelta(seconds=window_seconds)
self.calls = defaultdict(list)
async def acquire(self, key: str) -> bool:
now = datetime.now()
# Clean old entries
self.calls[key] = [
t for t in self.calls[key]
if now - t < self.window
]
if len(self.calls[key]) >= self.max_calls:
return False
self.calls[key].append(now)
return True
async def wait_and_acquire(self, key: str, timeout: int = 30):
start = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start < timeout:
if await self.acquire(key):
return True
await asyncio.sleep(1)
raise TimeoutError(f"Rate limit exceeded for {key}")
4. Audit Logging
Log every action for forensic purposes:
import json
import logging
from datetime import datetime
from typing import Any
class SecurityAuditLogger:
def __init__(self, log_file: str = "mcp_audit.log"):
self.logger = logging.getLogger("mcp_audit")
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_tool_call(
self,
tool_name: str,
arguments: dict,
result: Any,
success: bool,
user: str = "unknown"
):
entry = {
"timestamp": datetime.now().isoformat(),
"event": "tool_call",
"tool": tool_name,
"arguments": self._sanitize(arguments),
"result_preview": self._sanitize(str(result)[:500]),
"success": success,
"user": user
}
self.logger.info(json.dumps(entry))
def _sanitize(self, obj: Any) -> Any:
"""Remove sensitive fields from logs."""
SENSITIVE_FIELDS = {"password", "token", "secret", "api_key", "credential"}
if isinstance(obj, dict):
return {
k: "[REDACTED]" if k.lower() in SENSITIVE_FIELDS else v
for k, v in obj.items()
}
return obj
Building a Security MCP Server
Let’s build a complete MCP server for security operations with proper controls:
# security_mcp_server.py
import asyncio
import json
import subprocess
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, validator
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, Resource, TextContent
# Configuration
ALLOWED_LOG_PATHS = [
"/var/log/auth.log",
"/var/log/syslog",
"/var/log/nginx/access.log"
]
MAX_LOG_SIZE = 10 * 1024 * 1024 # 10MB
RATE_LIMIT_CALLS = 20
RATE_LIMIT_WINDOW = 60 # seconds
# Validated request models
class LogQuery(BaseModel):
log_path: str
pattern: str
max_lines: int = 100
@validator('log_path')
def validate_path(cls, v):
if v not in ALLOWED_LOG_PATHS:
raise ValueError(f"Log path not in allowed list: {v}")
return v
@validator('max_lines')
def validate_lines(cls, v):
if v > 1000:
raise ValueError("max_lines cannot exceed 1000")
return v
class IPQuery(BaseModel):
ip_address: str
@validator('ip_address')
def validate_ip(cls, v):
import ipaddress
try:
addr = ipaddress.ip_address(v)
# Prevent internal scanning
if addr.is_private:
raise ValueError("Cannot query private IP addresses")
return v
except ValueError:
raise ValueError(f"Invalid IP address: {v}")
# Initialize server
app = Server("security-tools")
audit_logger = SecurityAuditLogger()
rate_limiter = RateLimiter(RATE_LIMIT_CALLS, RATE_LIMIT_WINDOW)
# Declare capabilities
@app.list_tools()
async def list_tools():
return [
Tool(
name="query_logs",
description="Search log files for patterns",
inputSchema={
"type": "object",
"properties": {
"log_path": {
"type": "string",
"description": "Path to log file",
"enum": ALLOWED_LOG_PATHS
},
"pattern": {
"type": "string",
"description": "Regex pattern to search for"
},
"max_lines": {
"type": "integer",
"description": "Maximum lines to return",
"default": 100
}
},
"required": ["log_path", "pattern"]
}
),
Tool(
name="check_ip_reputation",
description="Check IP reputation against threat intel",
inputSchema={
"type": "object",
"properties": {
"ip_address": {
"type": "string",
"description": "IP address to check"
}
},
"required": ["ip_address"]
}
)
]
@app.list_resources()
async def list_resources():
return [
Resource(
uri=f"file://{path}",
name=Path(path).name,
mimeType="text/plain"
)
for path in ALLOWED_LOG_PATHS
]
# Implement tools
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
# Wait for rate limit
await rate_limiter.wait_and_acquire(name)
if name == "query_logs":
return await _query_logs(arguments)
elif name == "check_ip_reputation":
return await _check_ip_reputation(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
audit_logger.log_tool_call(name, arguments, str(e), False)
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _query_logs(args: dict) -> list[TextContent]:
# Validate
query = LogQuery(**args)
# Execute safely
try:
# Use grep with timeout
proc = await asyncio.create_subprocess_exec(
"grep", "-E", query.pattern, query.log_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=10
)
output = stdout.decode()[:MAX_LOG_SIZE]
except asyncio.TimeoutError:
proc.kill()
raise RuntimeError("Log query timed out")
lines = output.split("\n")[:query.max_lines]
result = "\n".join(lines)
audit_logger.log_tool_call(
"query_logs", args,
f"{len(lines)} lines returned",
True
)
return [TextContent(type="text", text=result)]
except Exception as e:
audit_logger.log_tool_call("query_logs", args, str(e), False)
raise
async def _check_ip_reputation(args: dict) -> list[TextContent]:
query = IPQuery(**args)
# Call threat intel API (simulated)
result = await _query_threat_intel(query.ip_address)
audit_logger.log_tool_call(
"check_ip_reputation",
args,
result,
True
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
async def _query_threat_intel(ip: str) -> dict:
# Placeholder - integrate with your threat intel API
return {
"ip": ip,
"reputation": "unknown",
"categories": [],
"last_seen": None,
"source": "local_database"
}
# Run server
async def main():
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream)
if __name__ == "__main__":
asyncio.run(main())
Running Your MCP Server
With Claude Desktop
Add to your Claude Desktop configuration:
// ~/Library/Application Support/Claude/claude_desktop_config.json (macOS)
{
"mcpServers": {
"security-tools": {
"command": "python",
"args": ["/path/to/security_mcp_server.py"],
"env": {
"LOG_LEVEL": "INFO"
}
}
}
}
With Other MCP Hosts
Most MCP hosts support similar configuration. The key fields:
command: Executable to run the serverargs: Command-line argumentsenv: Environment variablesdisabled: Set totrueto temporarily disable
Security Checklist
Before deploying an MCP server to production:
- Path traversal protection - Validate all file paths against allowlists
- Input validation - Use Pydantic or similar for all inputs
- Rate limiting - Prevent DoS from runaway AI agents
- Audit logging - Log all tool calls with sanitized arguments
- Timeout handling - All external calls must have timeouts
- Credential isolation - Never log or return secrets
- Minimal permissions - Only request necessary capabilities
- Error handling - Don’t leak internal details in errors
- Resource limits - Cap response sizes and execution time
- Testing - Security-focused integration tests
MCP vs Direct API Integration
| Aspect | MCP | Direct API |
|---|---|---|
| Standard protocol | ✅ | ❌ |
| Permission model | ✅ Built-in | ❌ Custom |
| Audit logging | ✅ Standard interface | ⚠️ Manual |
| AI provider agnostic | ✅ | ❌ |
| Implementation overhead | ⚠️ More setup | ✅ Less code |
| Security by default | ✅ | ⚠️ Manual |
Conclusion
MCP represents a significant step forward in safely integrating AI assistants with external systems. By standardizing the interface between AI and tools, it provides a consistent security boundary that’s easier to audit and control.
The key security principles remain unchanged: least privilege, input validation, rate limiting, and comprehensive logging. MCP simply provides a standard framework for implementing these consistently across all AI integrations.
For security-conscious organizations, MCP servers should be:
- Treated as privileged access points
- Reviewed with the same rigor as API endpoints
- Monitored for anomalous usage patterns
- Isolated in separate security contexts when possible
The standard is evolving rapidly, so stay updated on the official MCP specification for the latest security features and best practices.
~Amit