Insecure Direct Object Reference in Fastapi
How Insecure Direct Object Reference Manifests in Fastapi
Insecure Direct Object Reference (IDOR) occurs when an application exposes a reference to an internal implementation object, such as a database key, filename, or URL. In Fastapi applications, IDOR vulnerabilities often manifest through predictable primary keys, sequential IDs, or direct database object references that attackers can manipulate.
The most common Fastapi IDOR pattern involves CRUD endpoints that directly use path parameters as database identifiers without proper authorization checks. Consider this vulnerable Fastapi endpoint:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
app = FastAPI()
Base = declarative_base()
engine = create_engine("sqlite:///./test.db")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String)
email = Column(String)
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users/{user_id}", response_model=User)
async def read_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return userThis endpoint is vulnerable because it only checks if the user exists, not whether the requester has permission to view that specific user's data. An attacker can simply increment the user_id parameter to access any user in the database.
Another Fastapi-specific IDOR pattern occurs with Pydantic models that expose internal IDs. When using Pydantic's BaseModel with Fastapi, developers might inadvertently expose sensitive identifiers:
class UserResponse(BaseModel):
id: int
name: str
email: str
# Sensitive fields that shouldn't be exposed
internal_system_id: str
database_row_id: int
@app.get("/users/{user_id}", response_model=UserResponse)
async def read_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse(
id=user.id,
name=user.name,
email=user.email,
internal_system_id=user.internal_system_id, # Exposed!
database_row_id=user.database_row_id # Exposed!
)File-based IDOR vulnerabilities also appear in Fastapi applications. When endpoints accept file paths or IDs that map to filesystem resources:
@app.get("/files/{file_id}")
async def get_file(file_id: str):
file_path = f"/var/www/uploads/{file_id}.txt"
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path)This allows path traversal attacks where an attacker can request files like "../../../../etc/passwd" or access files they shouldn't have permission to view.
Fastapi's dependency injection system can also introduce IDOR vulnerabilities when dependencies don't properly validate permissions. A common mistake is injecting a database session without verifying the user's access rights:
@app.get("/accounts/{account_id}")
async def get_account(account_id: int, db: Session = Depends(get_db)):
account = db.query(Account).filter(Account.id == account_id).first()
if account is None:
raise HTTPException(status_code=404, detail="Account not found")
return accountThe vulnerability here is that any authenticated user can access any account by changing the account_id parameter, with no verification that they own or have permission to view that account.
Fastapi-Specific Detection
Detecting IDOR vulnerabilities in Fastapi applications requires both static code analysis and dynamic testing. For static analysis, look for patterns where path parameters or query parameters are directly used as database identifiers without authorization checks.
Using middleBrick's API scanning capabilities, you can detect IDOR vulnerabilities in your Fastapi endpoints. middleBrick's black-box scanning tests the unauthenticated attack surface by systematically manipulating identifiers:
# Scan your Fastapi API with middleBrick
middlebrick scan https://your-fastapi-app.com
# Or use the CLI for detailed JSON output
middlebrick scan --format json https://your-fastapi-app.com/api/users/1middleBrick tests IDOR by attempting to access sequential IDs and comparing responses. If it can access different users' data by simply changing numeric IDs, it flags this as a BOLA (Broken Object Level Authorization) vulnerability, which is the OWASP category for IDOR.
For OpenAPI spec analysis, middleBrick examines your Fastapi's auto-generated OpenAPI documentation to identify endpoints that accept identifiers without proper security schemes. Fastapi automatically generates OpenAPI specs, which middleBrick can analyze:
{
"paths": {
"/users/{user_id}": {
"get": {
"tags": ["users"],
"summary": "Get a specific user",
"parameters": [
{
"name": "user_id",
"in": "path",
"required": true,
"schema": {"type": "integer"}
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/User"}
}
}
}
}
}
}
}
}middleBrick's LLM/AI security scanning also detects IDOR in AI-related endpoints. If your Fastapi app includes LLM endpoints that accept user IDs or conversation IDs:
@app.post("/chat/{conversation_id}")
async def get_conversation(conversation_id: str, db: Session = Depends(get_db)):
conversation = db.query(Conversation).filter(Conversation.id == conversation_id).first()
if conversation is None:
raise HTTPException(status_code=404, detail="Conversation not found")
return conversation.messagesmiddleBrick tests these endpoints by attempting to access other users' conversations through ID manipulation, a critical security concern for AI applications handling sensitive data.
Dynamic testing with tools like Burp Suite or OWASP ZAP can complement middleBrick's scanning. These tools can automatically test IDOR by:
- Capturing requests with valid IDs
- Systematically changing ID parameters to test access control
- Analyzing response differences to detect unauthorized access
For Fastapi specifically, watch for endpoints using SQLAlchemy's get() method without authorization:
@app.get("/items/{item_id}")
async def read_item(item_id: int, db: Session = Depends(get_db)):
item = db.query(Item).get(item_id) # Vulnerable if no auth check
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return itemThis pattern is particularly dangerous because it's concise and easy to miss during code review.
Fastapi-Specific Remediation
Remediating IDOR vulnerabilities in Fastapi requires implementing proper authorization checks and avoiding direct object references. The most effective approach is to use Fastapi's dependency injection system to enforce authorization at the dependency level.
First, create a dependency that verifies the current user's permissions for a specific resource:
from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import TypeVar
T = TypeVar('T')
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
token_data = TokenData(user_id=user_id)
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.id == token_data.user_id).first()
if user is None:
raise credentials_exception
return user
def get_current_active_user(current_user: User = Depends(get_current_user)):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_resource_with_authorization(
resource_id: int,
resource_type: str,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Generic dependency for authorized resource access"""
resource = db.query(globals()[resource_type]).filter_by(id=resource_id).first()
if resource is None:
raise HTTPException(status_code=404, detail=f"{resource_type} not found")
# Check if user has permission to access this resource
if not has_permission(current_user, resource):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Not authorized to access {resource_type}"
)
return resourceThen use this dependency in your endpoints:
@app.get("/users/{user_id}")
async def read_user(
user: User = Depends(get_resource_with_authorization, resource_id=Depends(), resource_type="User")
):
return userFor more specific authorization logic, implement resource-specific permission checks:
def has_permission(user: User, resource: object) -> bool:
"""Check if user has permission to access resource"""
if isinstance(resource, User):
return user.id == resource.id # Users can only access their own data
elif isinstance(resource, Account):
return resource.user_id == user.id # Users can only access their own accounts
elif isinstance(resource, Document):
# Users can access documents they own or that are public
return resource.owner_id == user.id or resource.is_public
return FalseAnother effective pattern is using Fastapi's HTTPException with proper status codes to prevent information leakage:
@app.get("/accounts/{account_id}")
async def get_account(
account_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
account = db.query(Account).filter(Account.id == account_id).first()
if account is None:
# Don't reveal whether account exists or not
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Account not found or access denied"
)
if account.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account not found or access denied"
)
return accountFor file-based resources, implement secure file access patterns:
from fastapi import UploadFile, File
from pathlib import Path
@app.get("/user-files/{filename}")
async def get_user_file(
filename: str,
current_user: User = Depends(get_current_active_user)
):
# Only allow access to files in the user's directory
user_dir = Path(f"/var/www/uploads/{current_user.id}")
file_path = user_dir / filename
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found")
# Ensure the file is within the user's directory (prevent path traversal)
if not file_path.resolve().startswith(user_dir.resolve()):
raise HTTPException(status_code=403, detail="Forbidden")
return FileResponse(file_path)Consider using UUIDs instead of sequential integers for identifiers:
import uuid
from sqlalchemy import Column, String
class User(Base):
__tablename__ = "users"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
name = Column(String)
email = Column(String)This makes it significantly harder for attackers to guess valid IDs. Combine this with proper authorization checks for comprehensive protection.
Finally, integrate middleBrick's continuous monitoring into your development workflow to catch IDOR vulnerabilities early:
# GitHub Actions workflow
name: Security Scan
on: [push, pull_request]
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Run middleBrick Scan
run: |
npm install -g middlebrick
middlebrick scan https://staging.your-app.com/api
continue-on-error: true
- name: Fail if score below B
run: |
SCORE=$(middlebrick scan https://staging.your-app.com/api --format json | jq '.score')
if [ $SCORE -lt 80 ]; then
echo "Security score below threshold: $SCORE"
exit 1
fiThis ensures IDOR vulnerabilities are caught before deployment, maintaining your API's security posture.
Related CWEs: bolaAuthorization
| CWE ID | Name | Severity |
|---|---|---|
| CWE-250 | Execution with Unnecessary Privileges | HIGH |
| CWE-639 | Insecure Direct Object Reference | CRITICAL |
| CWE-732 | Incorrect Permission Assignment | HIGH |