Rate Limiting Bypass in Fastapi
How Rate Limiting Bypass Manifests in Fastapi
Rate limiting bypass vulnerabilities in Fastapi applications often stem from improper implementation of rate limiting logic, particularly when using middleware or decorators. The most common bypass vectors include:
- IP Spoofing: Fastapi's default rate limiters often rely on
request.client.hostfor identifying clients. Attackers can spoof X-Forwarded-For headers or use proxy chains to appear as different clients. - API Key Rotation: When rate limits are enforced per API key but the application doesn't validate key integrity, attackers can generate or rotate keys to bypass limits.
- Token Manipulation: JWT-based rate limiting that trusts unverified claims can be manipulated by altering token payloads.
- Race Conditions: Non-atomic rate limit decrements in Fastapi's async endpoints can be exploited using concurrent requests.
Here's a vulnerable Fastapi implementation that's susceptible to multiple bypass techniques:
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
app = FastAPI()
# Vulnerable: trusts X-Forwarded-For header without validation
limiter = Limiter(key_func=get_remote_address, default_limits=["10/minute"])
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/vulnerable")
@limiter.limit("10/minute")
async def vulnerable_endpoint(request: Request):
return {"message": "This endpoint is rate limited"}
An attacker can bypass this by sending requests with forged X-Forwarded-For headers:
curl -H "X-Forwarded-For: 1.1.1.1" http://localhost:8000/vulnerable
curl -H "X-Forwarded-For: 2.2.2.2" http://localhost:8000/vulnerable
Another common pattern involves improper API key handling:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class APIKey(BaseModel):
key: str
requests_remaining: int = 100
api_keys = {}
async def validate_api_key(api_key: str = Depends(get_api_key)):
if api_key not in api_keys:
raise HTTPException(status_code=401, detail="Invalid API key")
return api_keys[api_key]
@app.post("/generate-key")
async def generate_key():
# Vulnerable: no rate limiting on key generation
new_key = generate_random_key()
api_keys[new_key] = APIKey(key=new_key)
return {"api_key": new_key}
@app.get("/protected")
async def protected_endpoint(api_key: APIKey = Depends(validate_api_key)):
if api_key.requests_remaining <= 0:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
api_key.requests_remaining -= 1 # Non-atomic operation
return {"data": "protected resource"}
This implementation is vulnerable to race conditions where concurrent requests can decrement the counter below zero, and the key generation endpoint has no rate limiting, allowing unlimited key creation.
Fastapi-Specific Detection
Detecting rate limiting bypass vulnerabilities in Fastapi requires both static analysis and dynamic testing. Here are the key detection strategies:
Static Analysis Patterns
Look for these anti-patterns in your Fastapi codebase:
# 1. Untrusted header usage for client identification
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
client_ip = request.headers.get("X-Forwarded-For", request.client.host)
# Vulnerable if not validated against trusted proxy list
return await call_next(request)
# 2. Non-atomic rate limit decrements
@app.get("/endpoint")
async def endpoint(counter: Counter = Depends(get_counter)):
if counter.value <= 0:
raise HTTPException(429)
counter.value -= 1 # Race condition: not atomic
return {"data": "response"}
# 3. Missing rate limiting on key/resource generation
@app.post("/generate-resource")
async def generate_resource():
# No rate limiting - vulnerable to abuse
return create_resource()
Dynamic Testing with middleBrick
middleBrick's rate limiting bypass detection specifically targets Fastapi applications by:
- Testing header manipulation (X-Forwarded-For, X-Real-IP, Forwarded) to identify IP-based rate limiting bypasses
- Analyzing API key generation endpoints for missing rate limits
- Detecting race conditions through concurrent request testing
- Identifying JWT-based rate limiting that trusts unverified claims
Using middleBrick's CLI to scan a Fastapi API:
npm install -g middlebrick
middlebrick scan https://api.example.com --output json
The scanner will identify specific vulnerabilities like:
{
"rate_limiting_bypass": {
"severity": "high",
"finding": "API key generation endpoint at /api/keys lacks rate limiting",
"impact": "Unlimited API key creation enables complete rate limit bypass",
"remediation": "Add rate limiting to key generation endpoint using @limiter.limit(\"5/minute\")"
}
}
middleBrick also tests for common Fastapi-specific bypass patterns by sending requests with various header combinations and analyzing the server's response patterns to detect inconsistent rate limiting behavior.
Fastapi-Specific Remediation
Securing Fastapi rate limiting requires implementing robust, tamper-proof rate limiting mechanisms. Here are proven remediation strategies:
1. Trusted Client Identification
Never trust raw headers for client identification. Use Fastapi's request state with proxy validation:
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
app = FastAPI()
# Configure trusted proxy addresses
TRUSTED_PROXIES = ["192.168.1.1", "10.0.0.1"]
def get_trusted_remote_address(request: Request):
xff = request.headers.getlist("X-Forwarded-For")
if xff and request.client.host in TRUSTED_PROXIES:
return xff[-1] # Trust last proxy in chain
return request.client.host
limiter = Limiter(key_func=get_trusted_remote_address, default_limits=["100/minute"])
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.middleware("http")
async def proxy_validation(request: Request, call_next):
client_ip = get_trusted_remote_address(request)
# Log or validate client IP
response = await call_next(request)
return response
2. Atomic Rate Limiting with Redis
Implement atomic rate limiting to prevent race conditions:
from fastapi import FastAPI, HTTPException
from redis import Redis
from typing import Optional
import time
app = FastAPI()
redis_client = Redis(host="localhost", port=6379)
async def atomic_rate_limit(key: str, limit: int, window: int):
current = redis_client.incr(key)
if current == 1:
redis_client.expire(key, window)
if current > limit:
redis_client.decr(key)
raise HTTPException(status_code=429, detail="Rate limit exceeded")
return current
@app.get("/protected-atomic")
async def protected_atomic():
await atomic_rate_limit(f"rl:ip:{get_remote_address()}", 100, 60)
return {"data": "protected resource"}
3. Secure API Key Management
Implement rate-limited API key generation with proper validation:
from fastapi import FastAPI, HTTPException, Depends
from slowapi import Limiter
from pydantic import BaseModel
from typing import Optional
import secrets
app = FastAPI()
class APIKey(BaseModel):
key: str
requests_remaining: int = 100
created_at: float
api_keys = {}
@limiter.limit("5/minute")
@app.post("/generate-key")
async def generate_key():
key = secrets.token_urlsafe(32)
api_keys[key] = APIKey(key=key, created_at=time.time())
return {"api_key": key}
async def validate_api_key(api_key: str = Depends(get_api_key)):
if api_key not in api_keys:
raise HTTPException(status_code=401, detail="Invalid API key")
return api_keys[api_key]
@app.get("/protected-key")
@limiter.limit("100/minute")
async def protected_key(api_key: APIKey = Depends(validate_api_key)):
if api_key.requests_remaining <= 0:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
api_key.requests_remaining -= 1
return {"data": "protected resource"}
4. Comprehensive Rate Limiting Middleware
Implement a comprehensive rate limiting solution that covers all vectors:
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from typing import Callable
app = FastAPI()
class ComprehensiveLimiter:
def __init__(self):
self.limiter = Limiter(default_limits=["100/minute"])
self.api_key_limits = {}
self.user_limits = {}
async def limit_by_ip(self, request: Request):
ip = get_remote_address(request)
await self._check_limit(f"ip:{ip}", 100, 60)
async def limit_by_api_key(self, api_key: str):
if api_key not in self.api_key_limits:
self.api_key_limits[api_key] = {"remaining": 1000, "reset": time.time() + 3600}
if self.api_key_limits[api_key]["remaining"] <= 0:
raise HTTPException(429, detail="API key rate limit exceeded")
self.api_key_limits[api_key]["remaining"] -= 1
async def _check_limit(self, key: str, limit: int, window: int):
# Atomic check using Redis
pass
comprehensive_limiter = ComprehensiveLimiter()
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
await comprehensive_limiter.limit_by_ip(request)
response = await call_next(request)
return response
These remediation strategies address the most common Fastapi rate limiting bypass vulnerabilities by implementing trusted client identification, atomic operations, secure key management, and comprehensive rate limiting coverage.
Related CWEs: resourceConsumption
| CWE ID | Name | Severity |
|---|---|---|
| CWE-400 | Uncontrolled Resource Consumption | HIGH |
| CWE-770 | Allocation of Resources Without Limits | MEDIUM |
| CWE-799 | Improper Control of Interaction Frequency | MEDIUM |
| CWE-835 | Infinite Loop | HIGH |
| CWE-1050 | Excessive Platform Resource Consumption | MEDIUM |