HIGH rate limiting bypassfastapi

Rate Limiting Bypass in Fastapi

How Rate Limiting Bypass Manifests in Fastapi

Rate limiting bypass vulnerabilities in Fastapi applications often stem from improper implementation of rate limiting logic, particularly when using middleware or decorators. The most common bypass vectors include:

  • IP Spoofing: Fastapi's default rate limiters often rely on request.client.host for identifying clients. Attackers can spoof X-Forwarded-For headers or use proxy chains to appear as different clients.
  • API Key Rotation: When rate limits are enforced per API key but the application doesn't validate key integrity, attackers can generate or rotate keys to bypass limits.
  • Token Manipulation: JWT-based rate limiting that trusts unverified claims can be manipulated by altering token payloads.
  • Race Conditions: Non-atomic rate limit decrements in Fastapi's async endpoints can be exploited using concurrent requests.

Here's a vulnerable Fastapi implementation that's susceptible to multiple bypass techniques:

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

app = FastAPI()

# Vulnerable: trusts X-Forwarded-For header without validation
limiter = Limiter(key_func=get_remote_address, default_limits=["10/minute"])
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/vulnerable")
@limiter.limit("10/minute")
async def vulnerable_endpoint(request: Request):
    return {"message": "This endpoint is rate limited"}

An attacker can bypass this by sending requests with forged X-Forwarded-For headers:

curl -H "X-Forwarded-For: 1.1.1.1" http://localhost:8000/vulnerable
curl -H "X-Forwarded-For: 2.2.2.2" http://localhost:8000/vulnerable

Another common pattern involves improper API key handling:

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class APIKey(BaseModel):
    key: str
    requests_remaining: int = 100

api_keys = {}

async def validate_api_key(api_key: str = Depends(get_api_key)):
    if api_key not in api_keys:
        raise HTTPException(status_code=401, detail="Invalid API key")
    return api_keys[api_key]

@app.post("/generate-key")
async def generate_key():
    # Vulnerable: no rate limiting on key generation
    new_key = generate_random_key()
    api_keys[new_key] = APIKey(key=new_key)
    return {"api_key": new_key}

@app.get("/protected")
async def protected_endpoint(api_key: APIKey = Depends(validate_api_key)):
    if api_key.requests_remaining <= 0:
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    api_key.requests_remaining -= 1  # Non-atomic operation
    return {"data": "protected resource"}

This implementation is vulnerable to race conditions where concurrent requests can decrement the counter below zero, and the key generation endpoint has no rate limiting, allowing unlimited key creation.

Fastapi-Specific Detection

Detecting rate limiting bypass vulnerabilities in Fastapi requires both static analysis and dynamic testing. Here are the key detection strategies:

Static Analysis Patterns

Look for these anti-patterns in your Fastapi codebase:

# 1. Untrusted header usage for client identification
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    client_ip = request.headers.get("X-Forwarded-For", request.client.host)
    # Vulnerable if not validated against trusted proxy list
    return await call_next(request)

# 2. Non-atomic rate limit decrements
@app.get("/endpoint")
async def endpoint(counter: Counter = Depends(get_counter)):
    if counter.value <= 0:
        raise HTTPException(429)
    counter.value -= 1  # Race condition: not atomic
    return {"data": "response"}

# 3. Missing rate limiting on key/resource generation
@app.post("/generate-resource")
async def generate_resource():
    # No rate limiting - vulnerable to abuse
    return create_resource()

Dynamic Testing with middleBrick

middleBrick's rate limiting bypass detection specifically targets Fastapi applications by:

  • Testing header manipulation (X-Forwarded-For, X-Real-IP, Forwarded) to identify IP-based rate limiting bypasses
  • Analyzing API key generation endpoints for missing rate limits
  • Detecting race conditions through concurrent request testing
  • Identifying JWT-based rate limiting that trusts unverified claims

Using middleBrick's CLI to scan a Fastapi API:

npm install -g middlebrick
middlebrick scan https://api.example.com --output json

The scanner will identify specific vulnerabilities like:

{
  "rate_limiting_bypass": {
    "severity": "high",
    "finding": "API key generation endpoint at /api/keys lacks rate limiting",
    "impact": "Unlimited API key creation enables complete rate limit bypass",
    "remediation": "Add rate limiting to key generation endpoint using @limiter.limit(\"5/minute\")"
  }
}

middleBrick also tests for common Fastapi-specific bypass patterns by sending requests with various header combinations and analyzing the server's response patterns to detect inconsistent rate limiting behavior.

Fastapi-Specific Remediation

Securing Fastapi rate limiting requires implementing robust, tamper-proof rate limiting mechanisms. Here are proven remediation strategies:

1. Trusted Client Identification

Never trust raw headers for client identification. Use Fastapi's request state with proxy validation:

from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

app = FastAPI()

# Configure trusted proxy addresses
TRUSTED_PROXIES = ["192.168.1.1", "10.0.0.1"]

def get_trusted_remote_address(request: Request):
    xff = request.headers.getlist("X-Forwarded-For")
    if xff and request.client.host in TRUSTED_PROXIES:
        return xff[-1]  # Trust last proxy in chain
    return request.client.host

limiter = Limiter(key_func=get_trusted_remote_address, default_limits=["100/minute"])
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.middleware("http")
async def proxy_validation(request: Request, call_next):
    client_ip = get_trusted_remote_address(request)
    # Log or validate client IP
    response = await call_next(request)
    return response

2. Atomic Rate Limiting with Redis

Implement atomic rate limiting to prevent race conditions:

from fastapi import FastAPI, HTTPException
from redis import Redis
from typing import Optional
import time

app = FastAPI()
redis_client = Redis(host="localhost", port=6379)

async def atomic_rate_limit(key: str, limit: int, window: int):
    current = redis_client.incr(key)
    if current == 1:
        redis_client.expire(key, window)
    if current > limit:
        redis_client.decr(key)
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    return current

@app.get("/protected-atomic")
async def protected_atomic():
    await atomic_rate_limit(f"rl:ip:{get_remote_address()}", 100, 60)
    return {"data": "protected resource"}

3. Secure API Key Management

Implement rate-limited API key generation with proper validation:

from fastapi import FastAPI, HTTPException, Depends
from slowapi import Limiter
from pydantic import BaseModel
from typing import Optional
import secrets

app = FastAPI()

class APIKey(BaseModel):
    key: str
    requests_remaining: int = 100
    created_at: float

api_keys = {}

@limiter.limit("5/minute")
@app.post("/generate-key")
async def generate_key():
    key = secrets.token_urlsafe(32)
    api_keys[key] = APIKey(key=key, created_at=time.time())
    return {"api_key": key}

async def validate_api_key(api_key: str = Depends(get_api_key)):
    if api_key not in api_keys:
        raise HTTPException(status_code=401, detail="Invalid API key")
    return api_keys[api_key]

@app.get("/protected-key")
@limiter.limit("100/minute")
async def protected_key(api_key: APIKey = Depends(validate_api_key)):
    if api_key.requests_remaining <= 0:
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    api_key.requests_remaining -= 1
    return {"data": "protected resource"}

4. Comprehensive Rate Limiting Middleware

Implement a comprehensive rate limiting solution that covers all vectors:

from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from typing import Callable

app = FastAPI()

class ComprehensiveLimiter:
    def __init__(self):
        self.limiter = Limiter(default_limits=["100/minute"])
        self.api_key_limits = {}
        self.user_limits = {}
    
    async def limit_by_ip(self, request: Request):
        ip = get_remote_address(request)
        await self._check_limit(f"ip:{ip}", 100, 60)
    
    async def limit_by_api_key(self, api_key: str):
        if api_key not in self.api_key_limits:
            self.api_key_limits[api_key] = {"remaining": 1000, "reset": time.time() + 3600}
        
        if self.api_key_limits[api_key]["remaining"] <= 0:
            raise HTTPException(429, detail="API key rate limit exceeded")
        
        self.api_key_limits[api_key]["remaining"] -= 1
    
    async def _check_limit(self, key: str, limit: int, window: int):
        # Atomic check using Redis
        pass

comprehensive_limiter = ComprehensiveLimiter()

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    await comprehensive_limiter.limit_by_ip(request)
    response = await call_next(request)
    return response

These remediation strategies address the most common Fastapi rate limiting bypass vulnerabilities by implementing trusted client identification, atomic operations, secure key management, and comprehensive rate limiting coverage.

Related CWEs: resourceConsumption

CWE IDNameSeverity
CWE-400Uncontrolled Resource Consumption HIGH
CWE-770Allocation of Resources Without Limits MEDIUM
CWE-799Improper Control of Interaction Frequency MEDIUM
CWE-835Infinite Loop HIGH
CWE-1050Excessive Platform Resource Consumption MEDIUM

Frequently Asked Questions

How can I test if my Fastapi rate limiting is vulnerable to bypass attacks?
Use middleBrick's CLI to scan your Fastapi API endpoints. The scanner tests for common bypass vectors including header manipulation, race conditions, and missing rate limits on key generation endpoints. You can also manually test by sending concurrent requests, manipulating X-Forwarded-For headers, and attempting to generate multiple API keys rapidly to see if rate limits are properly enforced.
What's the difference between IP-based and API key-based rate limiting in Fastapi?
IP-based rate limiting identifies clients by their IP address, which can be spoofed or shared across multiple users. API key-based rate limiting ties limits to specific credentials, offering better control but requiring secure key management. For production Fastapi applications, a hybrid approach using both methods with atomic operations and trusted proxy validation provides the most robust protection against bypass attacks.