Ssrf Server Side in Fastapi
How SSRF Manifests in FastAPI Applications
Server-Side Request Forgery (SSRF) in FastAPI applications occurs when an endpoint accepts user-controlled URLs and makes outbound HTTP requests without proper validation. FastAPI's async nature and powerful dependency injection system create specific SSRF attack vectors that developers must understand.
The most common SSRF pattern in FastAPI involves endpoints that accept URLs for processing:
from fastapi import FastAPI, HTTPException
from typing import Optional
import httpx
app = FastAPI()
@app.post("/fetch-content/")
async def fetch_content(url: str):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.textThis seemingly innocent endpoint allows attackers to make the FastAPI server request any URL, including internal services. FastAPI's automatic request validation means URLs are parsed correctly, but this also means attackers can craft sophisticated payloads.
FastAPI-specific SSRF vectors include:
- Dependency Injection Abuse: FastAPI's dependency system can be exploited when dependencies accept URLs. A malicious dependency might look like:
async def get_external_service(url: str = Depends()) -> httpx.AsyncClient:
return httpx.AsyncClient(base_url=url)Attackers can manipulate the URL parameter to control the base URL of the HTTP client.
- Background Task SSRF: FastAPI's background tasks run after response completion, creating delayed SSRF opportunities:
from fastapi import BackgroundTasks
@app.post("/process-with-background/")
async def process_with_background(url: str, background_tasks: BackgroundTasks):
background_tasks.add_task(fetch_internal_data, url)
return {"status": "processing"}The background task continues execution after the client disconnects, potentially accessing internal services.
- Middleware-Based SSRF: Custom middleware that processes URLs can introduce SSRF:
class URLProcessingMiddleware:
def __init__(self, app: FastAPI):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
# Malicious URL processing here
pass
return await self.app(scope, receive, send)FastAPI's async/await model means SSRF requests can be made concurrently, potentially overwhelming internal services or creating timing-based attacks.
FastAPI-Specific Detection Methods
Detecting SSRF in FastAPI requires understanding both the framework's patterns and the attack surface. Here are FastAPI-specific detection approaches:
Static Analysis: Scan FastAPI applications for SSRF-prone patterns:
import ast
import astor
def detect_ssrf_patterns(filepath):
with open(filepath) as f:
tree = ast.parse(f.read())
patterns = []
for node in ast.walk(tree):
# Detect async HTTP client usage
if isinstance(node, ast.AsyncWith) and 'httpx.AsyncClient' in astor.to_source(node):
patterns.append(astor.to_source(node))
# Detect URL parameters in FastAPI endpoints
if isinstance(node, ast.AsyncFunctionDef):
for arg in node.args.args:
if arg.annotation == ast.Name(id='str'):
patterns.append(f"Potential SSRF: {node.name} accepts str parameter")
return patternsRuntime Detection: Use middleware to log outbound requests:
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import httpx
class SSRFDetectionMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Check if response contains outbound requests
if hasattr(request.state, 'outbound_requests'):
for req in request.state.outbound_requests:
print(f"Outbound request detected: {req.url}")
return response
@app.middleware("http")
async def track_outbound_requests(request: Request, call_next):
request.state.outbound_requests = []
async def wrapper(scope, receive, send):
async def sending(send):
async for item in send:
if isinstance(item, dict) and 'http.response.start' in item:
# This is where you'd track responses
pass
yield item
return await call_next(request)
return await wrapper(request.scope, request.receive, sending)Automated Scanning with middleBrick: middleBrick's black-box scanning approach is particularly effective for FastAPI applications because it tests the actual running API without requiring source code access. The scanner tests SSRF by attempting to access internal services, cloud metadata endpoints, and other restricted resources.
middleBrick's SSRF detection for FastAPI includes:
- Testing for access to
http://169.254.169.254(AWS metadata) - Testing for access to
http://127.0.0.1andhttp://localhost - Testing for access to RFC 1918 private IP ranges
- Testing for access to cloud-specific endpoints (GCP, Azure metadata services)
The scanner provides specific findings with severity levels and remediation guidance tailored to FastAPI's async patterns.
FastAPI-Specific Remediation Techniques
Remediating SSRF in FastAPI requires a defense-in-depth approach that leverages FastAPI's features and Python's ecosystem.
URL Validation with FastAPI Dependencies: Create reusable SSRF protection as a dependency:
from fastapi import HTTPException, Depends
import httpx
from urllib.parse import urlparse
async def validate_url(url: str) -> str:
parsed = urlparse(url)
# Block private IP ranges
if parsed.hostname:
import ipaddress
try:
ip = ipaddress.ip_address(parsed.hostname)
if ip.is_private or ip.is_loopback:
raise HTTPException(status_code=400, detail="Private IP addresses are not allowed")
except ValueError:
# Not an IP address, could be a domain
pass
# Block cloud metadata endpoints
if parsed.netloc in ['169.254.169.254', 'metadata.google.internal', 'instance-data.azure.com']:
raise HTTPException(status_code=400, detail="Cloud metadata access is not allowed")
# Block common internal ports
if parsed.port in [2375, 2376, 2379, 2380, 4194, 5000, 5432, 5984, 6379, 8080, 8081, 8983, 9200]:
raise HTTPException(status_code=400, detail="Access to common internal ports is restricted")
return url
@app.post("/safe-fetch/")
async def safe_fetch(
url: str = Depends(validate_url),
client: httpx.AsyncClient = Depends(get_http_client)
):
response = await client.get(url)
return response.text
async def get_http_client():
return httpx.AsyncClient(timeout=10.0)FastAPI Middleware for SSRF Protection: Implement a global SSRF filter:
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import httpx
class SSRFProtectionMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Check if this is a request that might trigger SSRF
if request.method == 'POST' and request.headers.get('content-type') == 'application/json':
body = await request.json()
if 'url' in body:
await self.validate_url_for_ssrf(body['url'])
return await call_next(request)
async def validate_url_for_ssrf(self, url: str):
parsed = urlparse(url)
# Custom allowlist of domains
allowed_domains = ['api.example.com', 'trusted-service.com']
if parsed.netloc not in allowed_domains:
raise HTTPException(status_code=400, detail="URL not in allowlist")
app.add_middleware(SSRFProtectionMiddleware)Using FastAPI's Pydantic Models for Input Validation: Create strict models that prevent SSRF:
from pydantic import BaseModel, constr
from typing import Literal
class SafeURL(BaseModel):
url: constr(regex=r'^(https?://)?([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}(:[0-9]+)?(/.*)?$')
class Config:
# Reject localhost and IP addresses
@classmethod
def validate(cls, value):
if 'localhost' in value.lower():
raise ValueError('localhost not allowed')
if value.startswith('http://127.') or value.startswith('http://192.168.'):
raise ValueError('Private IP not allowed')
return value
@app.post("/fetch-validated/")
async def fetch_validated(data: SafeURL):
async with httpx.AsyncClient() as client:
response = await client.get(data.url)
return response.textRate Limiting to Mitigate SSRF Impact: FastAPI's async nature makes rate limiting crucial:
from fastapi import FastAPI, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
app = FastAPI()
limiter = Limiter(key_func=get_remote_address, default_limits=["10/minute"]) # Limit SSRF impact
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/fetch-limited/")
@limiter.limit("5/minute")
async def fetch_limited(url: str):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.textThese remediation techniques combine FastAPI's dependency injection, middleware system, and Pydantic validation to create a robust defense against SSRF attacks.