HIGH race conditionfastapijwt tokens

Race Condition in Fastapi with Jwt Tokens

Race Condition in Fastapi with Jwt Tokens — how this specific combination creates or exposes the vulnerability

A race condition in FastAPI involving JWT tokens typically occurs when token validation and state-changing operations are not performed atomically. Consider an endpoint that first validates a JWT access token and then updates a resource based on claims in the token, such as a user identifier. If another request concurrently modifies the underlying state relied upon during validation or processing (for example, rotating signing keys, revoking a token via logout, or altering a “scope” claim stored in a database), the validation performed at the start of the request may no longer align with the current authorization context by the time the operation completes.

For example, an endpoint may decode a JWT to extract a user ID, check that the user has permission to act on a resource, and then perform a database update. Between the decode/check and the update, an attacker could trigger a concurrent request that modifies the resource or the user’s permissions. If the application does not re-validate permissions immediately before performing the update, the earlier JWT-based authorization decision may be applied to an outdated state. In systems where token validity depends on external state (e.g., a denylist or per-request scopes stored in a data store), failing to re-check that state atomically with the operation can lead to privilege escalation or unauthorized actions, which may be surfaced as findings in categories like BOLA/IDOR or Property Authorization during a scan.

FastAPI’s dependency system can inadvertently contribute to this pattern if dependencies decode and validate JWTs once per request but do not enforce re-validation before each sensitive write. Similarly, asynchronous background tasks that act on decoded payloads without confirming the latest state can introduce windows where the JWT’s assertions no longer match the actual permissions. A security scan testing authentication and authorization may flag these patterns under checks such as BOLA/IDOR or Property Authorization, especially when token usage intersects with shared mutable state.

Jwt Tokens-Specific Remediation in Fastapi — concrete code fixes

To mitigate race conditions with JWT tokens in FastAPI, couple decoding and validation with the operation that depends on the claims, and avoid relying on stale decoded data. Prefer a short-lived access token strategy combined with a mechanism to detect invalidation, and ensure authorization checks occur immediately before performing state changes.

Example 1: Immediate re-validation within the endpoint

Decode the JWT, extract claims, and re-check permissions against the current data immediately before performing a write. This reduces the window where token-state mismatch can be exploited.

from fastapi import Depends, FastAPI, HTTPException, status
from jose import JWTError, jwt
from pydantic import BaseModel
from typing import Optional

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

app = FastAPI()

class Item(BaseModel):
    name: str
    owner_id: str

def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
        return {"user_id": user_id}
    except JWTError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")

@app.put("/items/{item_id}")
def update_item(
    item_id: str,
    item_update: Item,
    current_user: dict = Depends(get_current_user)
):
    # Re-validate ownership against the latest data immediately before the write
    item = fetch_item_from_db(item_id)
    if item is None:
        raise HTTPException(status_code=404, detail="Item not found")
    if item.owner_id != current_user["user_id"]:
        raise HTTPException(status_code=403, detail="Not allowed")
    # Perform the update with the current, verified ownership
    save_item_to_db(item_id, item_update)
    return {"status": "updated"}

def fetch_item_from_db(item_id: str) -> Optional[dict]:
    # Placeholder: replace with actual DB call
    return {"id": item_id, "owner_id": "user-123"}

def save_item_to_db(item_id: str, data: Item) -> None:
    # Placeholder: replace with actual DB call
    pass

Example 2: Short-lived tokens and denylist checks

Use short expiration times for access tokens and validate against a denylist or a per-request scope store immediately before sensitive actions. This helps ensure that token revocation or scope changes are reflected quickly.

from fastapi import Depends, FastAPI, HTTPException, status
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

app = FastAPI()

# Simplified denylist: in production, use a fast store like Redis with TTL aligned with token expiry
denylist = set()

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def is_token_revoked(token: str) -> bool:
    # In real systems, check a distributed denylist/cache
    return token in denylist

def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        if is_token_revoked(token):
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token revoked")
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
        return {"user_id": user_id}
    except JWTError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")

@app.delete("/tokens/revoke")
def revoke_token(token: str = Depends(oauth2_scheme)):
    denylist.add(token)
    return {"detail": "Token revoked"}

Operational guidance

  • Keep access token lifetimes short to reduce the impact of leaked tokens and to limit the validity window for race conditions tied to token-state mismatches.
  • For sensitive operations, re-check authorization against the latest state immediately before the write, rather than relying solely on claims extracted from a JWT earlier in the request lifecycle.
  • Use fast revocation stores (e.g., distributed caches) for denylists to make invalidation timely and consistent across instances.

These practices align with checks such as Authentication, Property Authorization, and BOLA/IDOR that a scan may highlight when token handling intersects with mutable authorization state.

Frequently Asked Questions

Can a race condition with JWT tokens lead to privilege escalation in FastAPI?
Yes. If authorization based on a JWT token is not re-validated immediately before a state-changing operation, an attacker can exploit timing windows to act on outdated permissions, potentially escalating privileges.
How does middleBrick assess race conditions involving JWT tokens?
middleBrick runs checks such as Authentication, Property Authorization, and BOLA/IDOR against the unauthenticated attack surface. Findings may highlight patterns where token validation and resource modification are not tightly coupled, pointing to potential race conditions.