Api Rate Abuse in Fastapi with Jwt Tokens
Api Rate Abuse in Fastapi with Jwt Tokens — how this specific combination creates or exposes the vulnerability
Rate abuse in FastAPI when JWT tokens are used for authentication involves repeated authentication attempts or token validation that overwhelm specific endpoints or the authentication layer itself. Even with properly signed tokens, an attacker can target the token verification path or endpoints that accept tokens at a high rate, attempting to exhaust server resources or bypass intended throttling.
In FastAPI, developers often apply global or route-level rate limiting based on IP addresses. JWT tokens add a user-specific identifier (e.g., sub or a custom claim), but if rate limits are applied only at the IP level, an attacker with a single IP but many stolen or generated tokens can iterate through them while staying under the IP-based threshold. This shifts abuse from simple IP flooding to token enumeration or token reuse attacks.
Another scenario involves endpoints that decode and validate JWT on every request without caching the validation result. If the validation logic is computationally expensive (for example, making extra calls to a key provider or performing heavy cryptographic checks), an attacker can send many requests with different tokens or slightly modified tokens to consume CPU and memory, leading to denial of service.
The combination also exposes issues in token lifecycle handling. If token revocation or introspection is not enforced on each request, an attacker can use compromised tokens repeatedly until they naturally expire, especially when global rate limits do not account for token validity state. Furthermore, if error messages during token validation differ between valid and invalid tokens, an attacker can use rate-limited probes to enumerate valid tokens or user identities by observing response variations, a subtle information-disclosure vector.
middleBrick scans such API surfaces with its Authentication and Rate Limiting checks, running unauthenticated probes to detect whether rate controls are applied per token or per IP and whether token validation paths are susceptible to resource exhaustion. The tool also flags inconsistent error handling in authentication flows, which can aid attackers in refining abuse strategies.
Jwt Tokens-Specific Remediation in Fastapi — concrete code fixes
To mitigate rate abuse in FastAPI with JWT tokens, apply rate limits on dimensions that include the user identity from the token, cache validation results, and ensure token validation is efficient and consistent.
1. Rate limiting with token-aware identifiers
Instead of relying solely on IP-based limits, extract a stable user ID from the validated JWT and use it as part of the rate-limiting key. This prevents an attacker from cycling through tokens to bypass limits.
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import jwt
app = FastAPI()
limiter = Limiter(key_func=get_remote_address) # fallback, will be overridden per route
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
security = HTTPBearer()
def decode_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
headers={"WWW-Authenticate": "Bearer"},
)
return user_id
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
@app.get("/secure")
@limiter.limit("10/minute", key_func=lambda request: decode_token(request.headers.get("authorization", "").replace("Bearer ", "")))
async def secure_route(credentials: HTTPAuthorizationCredentials = Depends(security)):
user_id = decode_token(credentials.credentials)
return {"message": f"Hello user {user_id}"}
In this example, the rate limit key includes the decoded user ID from the JWT, ensuring that limits apply per user rather than per IP.
2. Cache token validation
To avoid expensive repeated validation, cache the decoded payload for the lifetime of the token (or a short window). Use an in-memory or distributed cache keyed by token or a hash of it.
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from slowapi import Limiter
from slowapi.util import get_remote_address
import jwt
import time
app = FastAPI()
limiter = Limiter(key_func=get_remote_address)
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
# Simple in-memory cache; use Redis or similar in production
validation_cache = {}
CACHE_TTL = 300 # seconds
def cached_decode_token(token: str):
cached = validation_cache.get(token)
if cached and (time.time() - cached["at"] < CACHE_TTL):
if cached["error"]:
raise cached["error"]
return cached["payload"]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
validation_cache[token] = {"at": time.time(), "payload": payload, "error": None}
return payload
except jwt.ExpiredSignatureError as e:
error = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": "Bearer"},
)
validation_cache[token] = {"at": time.time(), "payload": None, "error": error}
raise error
except jwt.InvalidTokenError as e:
error = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
validation_cache[token] = {"at": time.time(), "payload": None, "error": error}
raise error
@app.get("/items")
@limiter.limit("20/minute", key_func=get_remote_address)
async def read_items(credentials: HTTPAuthorizationCredentials = Depends(security)):
payload = cached_decode_token(credentials.credentials)
user_id = payload.get("sub")
return {"user": user_id, "items": []}
3. Consistent error handling and secure defaults
Ensure token validation errors are uniform in timing and messaging to prevent attackers from probing valid tokens via timing differences or error messages.
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import time
app = FastAPI()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
security = HTTPBearer()
def verify_token(token: str):
# Constant-time style handling: perform decode even on malformed tokens to reduce timing leaks
try:
jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except jwt.ExpiredSignatureError:
pass # swallow to keep timing similar
except jwt.InvalidTokenError:
pass
# Always return a generic error after a small sleep to reduce timing distinguishability
time.sleep(0.01)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
@app.get("/profile")
async def profile(credentials: HTTPAuthorizationCredentials = Depends(security)):
verify_token(credentials.credentials)
return {"profile": "data"}
These fixes address API abuse by tying limits to authenticated identities, reducing computational load via caching, and minimizing information leakage through error handling.