Ssrf in Fastapi with Jwt Tokens
Ssrf in Fastapi with Jwt Tokens — how this specific combination creates or exposes the vulnerability
Server-Side Request Forgery (SSRF) in FastAPI applications that rely on JWT tokens for authentication can emerge from a mismatch between trust boundaries and token handling. When an endpoint accepts user-supplied URLs and makes outbound HTTP requests, an attacker can supply a target that routes traffic through internal services, metadata endpoints, or SSRF-aware proxies. The presence of JWT tokens does not inherently prevent SSRF; if the server uses the token only for inbound authorization and then blindly forwards requests, the server becomes the attacker’s proxy.
Consider a FastAPI endpoint that retrieves user profile data from a downstream service. If the developer constructs the outbound request by concatenating user input without strict validation, an input like http://169.169.169.200/latest/meta-data/iam/security-credentials/ can direct the server to the cloud metadata service. Even when the request includes a JWT in the Authorization header for the client-facing API, the SSRF occurs at the server-to-server level, where the JWT is often not required or is passed as a bearer token to the internal target. This can expose internal endpoints, instance metadata, or services that only accept requests from localhost or internal networks.
JWT-specific risks intensify when token introspection or validation logic is co-located with outbound calls. For example, if the server decodes the JWT to extract claims and then uses those claims to build a URL or to decide which backend to call, an attacker who can influence the token (via a weak signing key or a misconfigured issuer) may indirectly steer the server’s outbound path. Additionally, if the application caches tokens or shares HTTP clients across requests, a compromised token or a token with broad scopes can amplify the impact of an SSRF by enabling lateral movement within the API mesh.
Common patterns that lead to SSRF with JWT in FastAPI include:
- Using user-controlled URL parameters to build request targets without validating the host or scheme.
- Forwarding JWTs to downstream services without verifying whether those services expect or require them, potentially bypassing intended network-level isolation.
- Relying on incomplete allowlists for destinations, which permits resolution of internal IPs, cloud metadata endpoints, or localhost-based services.
To detect such issues, scanning tools evaluate whether outbound requests respect strict destination allowlists, whether the JWT is used only for intended scopes, and whether internal endpoints are reachable from the server’s network. The presence of JWTs should not be conflated with network-level trust; scanners check whether token handling logic inadvertently exposes internal surfaces through unchecked user input.
Jwt Tokens-Specific Remediation in Fastapi — concrete code fixes
Remediation centers on strict input validation, network segmentation, and disciplined use of JWTs. Below are concrete FastAPI code examples that demonstrate secure patterns.
1. Validate and restrict outbound destinations
Never forward user input directly. Use a strict allowlist of hostnames and reject private IP ranges, localhost, and metadata addresses.
import re from urllib.parse import urlparse from fastapi import FastAPI, HTTPException, Depends import httpx app = FastAPI() ALLOWED_HOSTS = {"api.example.com", "data.example.com"} def is_allowed_host(url: str) -> bool: parsed = urlparse(url) if parsed.scheme not in {"https"}: return False host = parsed.hostname or "" if host in ALLOWED_HOSTS: return True # Reject private IPs and metadata-like patterns if re.match(r"^127\.\d+\.\d+\.\d+$", host): return False if re.match(r"^169\.254\.\d+\.\d+$", host): return False if re.match(r"^10\.", host) or re.match(r"^192\.168\.", host): return False if re.match(r"^172\.(1[6-9]|2[0-9]|3[01])\.", host): return False # Block cloud metadata endpoints if "169.254.169.254" in host or "metadata" in host.lower(): return False return False @app.get("/fetch-external") async def fetch_external(url: str, token: str = None): if not is_allowed_host(url): raise HTTPException(status_code=400, detail="Destination not allowed") headers = {} if token: headers["Authorization"] = f"Bearer {token}" async with httpx.AsyncClient() as client: resp = await client.get(url, headers=headers, timeout=10.0) resp.raise_for_status() return {"status": resp.status_code, "body": resp.text[:500]}2. Decouple JWT handling from destination selection
Do not derive the request URL from claims that could be manipulated. Use a fixed mapping or configuration instead.
from fastapi import FastAPI, HTTPException, Depends import httpx app = FastAPI() # Example: map a logical service name to a fixed, vetted endpoint SERVICE_ENDPOINTS = { "profile": "https://api.example.com/v1/profile", "settings": "https://api.example.com/v1/settings" } @app.get("/service/{service_name}") async def call_service(service_name: str, token: str = Depends(validate_jwt)): if service_name not in SERVICE_ENDPOINTS: raise HTTPException(status_code=400, detail="Invalid service") target = SERVICE_ENDPOINTS[service_name] headers = {"Authorization": f"Bearer {token}"} async with httpx.AsyncClient() as client: resp = await client.get(target, headers=headers, timeout=10.0) resp.raise_for_status() return resp.json() def validate_jwt(token: str = None): # Placeholder for actual JWT validation logic if not token: raise HTTPException(status_code=401, detail="Missing token") # Verify signature, issuer, scopes, etc. return token3. Enforce least-privilege tokens for downstream calls
If you must pass the JWT downstream, scope it to the minimal required permissions and avoid using the same token that guards the inbound API. Consider using short-lived tokens and explicit token exchange rather than simple forwarding.
import httpx async def get_limited_token(original_token: str) -> str: # Exchange the original token for a downstream-scoped token # This is a placeholder for your token exchange logic return "downstream_scoped_token" @app.get("/downstream-call") async def downstream_call(url: str = "https://api.example.com/v1/data", token: str = Depends(validate_jwt)): if not is_allowed_host(url): raise HTTPException(status_code=400, detail="Destination not allowed") downstream_token = await get_limited_token(token) async with httpx.AsyncClient() as client: resp = await client.get(url, headers={"Authorization": f"Bearer {downstream_token}"}, timeout=10.0) resp.raise_for_status() return resp.json()
Related CWEs: ssrf
| CWE ID | Name | Severity |
|---|---|---|
| CWE-918 | Server-Side Request Forgery (SSRF) | CRITICAL |
| CWE-441 | Unintended Proxy or Intermediary (Confused Deputy) | HIGH |