Nosql Injection in Fastapi
How Nosql Injection Manifests in Fastapi
Nosql injection in Fastapi applications typically occurs when user input is directly incorporated into NoSQL database queries without proper sanitization. Fastapi's async nature and common patterns with MongoDB create specific attack vectors that developers must understand.
The most common scenario involves Fastapi dependency injection where query parameters are passed directly to database methods. Consider this vulnerable Fastapi endpoint:
from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
app = FastAPI()
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.my_database
@app.get("/users")
async def get_users(name: str):
query = {"name": name} # Direct user input
results = await db.users.find(query).to_list(10)
return resultsAn attacker can exploit this by sending requests like:
GET /users?name[$ne]=1 HTTP/1.1
Host: localhost:8000
GET /users?name[$ne]=1&age[$lt]=20 HTTP/1.1
Host: localhost:8000This bypasses authentication by using MongoDB's query operators. The $ne operator returns all documents where name doesn't equal 1, effectively dumping the entire collection.
Fastapi's Pydantic model binding can also introduce vulnerabilities when using dynamic queries:
from pydantic import BaseModel
class UserQuery(BaseModel):
name: str
age: int
@app.get("/users")
async def get_users(query: UserQuery = Depends()):
results = await db.users.find(query.dict()).to_list(10)
return resultsEven though Pydantic validates types, it doesn't prevent NoSQL injection operators. An attacker can craft requests with special characters that Pydantic accepts but the database interprets as operators.
Another Fastapi-specific pattern involves path parameters used in queries:
@app.get("/users/{user_id}")
async def get_user(user_id: str):
query = {"_id": user_id}
user = await db.users.find_one(query)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return userAttackers can inject MongoDB operators through the URL path, such as /users/[$ne=null], causing the application to return arbitrary documents.
Fastapi-Specific Detection
Detecting NoSQL injection in Fastapi requires understanding both the framework's patterns and the underlying database interactions. Here's how to identify these vulnerabilities in your Fastapi applications.
Code Review Patterns: Look for these red flags in your Fastapi codebase:
# VULNERABLE - Direct query construction
query = {"field": request_data}
await db.collection.find(query).to_list(10)
# VULNERABLE - Dynamic operator usage
query = {"$or": [{ "field": value }, { "field2": value2 }]}
# VULNERABLE - Path parameter injection
@app.get("/items/{item_id}")
async def get_item(item_id: str):
await db.items.find_one({"_id": item_id})Automated Scanning with middleBrick: middleBrick's black-box scanning approach is particularly effective for Fastapi applications because it tests the actual runtime behavior without requiring source code access.
The scanner sends crafted payloads to Fastapi endpoints and analyzes responses for NoSQL injection indicators:
# Install and use middleBrick CLI
npm install -g middlebrick
middlebrick scan http://localhost:8000/api/users
# Output includes:
# - NoSQL injection vulnerability detection
# - Risk score (A-F) for each endpoint
# - Specific findings with remediation guidance
# - API inventory showing all discovered endpointsmiddleBrick tests 12 security categories including NoSQL injection by sending payloads with MongoDB operators like $ne, $gt, $lt, $regex, and $where. It analyzes whether the application returns unexpected data or error messages that reveal database structure.
Runtime Monitoring: For Fastapi applications in production, implement request logging and anomaly detection:
from fastapi import Request
from fastapi.middleware import Middleware
async def nosql_injection_middleware(request: Request, call_next):
# Check for suspicious query patterns
query = dict(request.query_params)
suspicious = ["$ne", "$gt", "$lt", "$regex", "$where"]
if any(op in str(query) for op in suspicious):
# Log and alert
print(f"Potential NoSQL injection attempt: {query}")
response = await call_next(request)
return response
app.add_middleware(nosql_injection_middleware)Fastapi-Specific Remediation
Remediating NoSQL injection in Fastapi requires a defense-in-depth approach. Here are Fastapi-specific techniques to secure your NoSQL database interactions.
1. Input Validation with Pydantic Models: Create strict validation models that reject special characters:
from pydantic import BaseModel, constr
from typing import Optional
class UserQuery(BaseModel):
name: constr(min_length=1, max_length=50, strip_whitespace=True)
age: Optional[int] = None
@validator('name')
def name_cannot_contain_operators(cls, v):
if any(char in v for char in '$(){}[].'):
raise ValueError('Invalid characters in name')
return v
@app.get("/users")
async def get_users(query: UserQuery = Depends()):
# Safe query construction
query_dict = query.dict(exclude_unset=True)
results = await db.users.find(query_dict).to_list(10)
return results2. Whitelist-Based Query Building: Only allow specific fields and operators:
ALLOWED_FIELDS = {"name", "age", "email"}
ALLOWED_OPERATORS = {"eq", "gt", "lt"} # Custom operators only
@app.get("/users")
async def get_users(**kwargs):
query = {}
for field, value in kwargs.items():
if field not in ALLOWED_FIELDS:
raise HTTPException(status_code=400, detail="Invalid field")
# Validate value format
if isinstance(value, str) and any(op in value for op in ["$ne", "$gt"]):
raise HTTPException(status_code=400, detail="Invalid operator")
query[field] = value
results = await db.users.find(query).to_list(10)
return results3. Use Fastapi Dependency Injection for Security: Create reusable security dependencies:
from fastapi import Depends, HTTPException
from typing import Dict
def validate_nosql_query(query: Dict) -> Dict:
"""Dependency to validate NoSQL queries"""
for key, value in query.items():
if isinstance(value, dict):
for operator in value.keys():
if operator.startswith('$'):
raise HTTPException(
status_code=400,
detail=f"Operator {operator} not allowed"
)
if isinstance(value, str):
if any(char in value for char in '$(){}[].'):
raise HTTPException(
status_code=400,
detail="Invalid characters in query"
)
return query
@app.get("/users")
async def get_users(query: Dict = Depends(validate_nosql_query)):
results = await db.users.find(query).to_list(10)
return results4. Parameterized Queries with PyMongo: While PyMongo doesn't support traditional parameterization, you can create safe query builders:
from bson import ObjectId
async def safe_find_one(collection, _id: str):
try:
# Validate ObjectId format
obj_id = ObjectId(_id)
return await collection.find_one({"_id": obj_id})
except Exception:
raise HTTPException(status_code=400, detail="Invalid ID format")
@app.get("/users/{user_id}")
async def get_user(user_id: str):
user = await safe_find_one(db.users, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user5. Rate Limiting and Monitoring: Combine Fastapi's rate limiting with security monitoring:
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
app = FastAPI()
app.exception_handler(RateLimitExceeded)(rate_limit_exceeded_handler)
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.middleware("http")
async def security_middleware(request: Request, call_next):
# Check for suspicious patterns
query = dict(request.query_params)
if any(char in str(query) for char in '$(){}[].'):
# Rate limit suspicious requests
limit = "10/minute;60/hour"
if not request.state.limiter.is_allowed("security_check", limit):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
response = await call_next(request)
return responseFrequently Asked Questions
How can I test my Fastapi application for NoSQL injection vulnerabilities?
middlebrick scan <your-api-url>. The scanner sends crafted payloads with MongoDB operators to test for injection vulnerabilities. It analyzes responses for data leakage and provides a security score with specific findings. For manual testing, try requests with operators like $ne, $gt, and $regex to see if they're properly rejected.Does Fastapi's Pydantic validation prevent NoSQL injection?
$ne or $gt in string fields that pass Pydantic validation but are interpreted as database operators. You need additional validation to reject special characters and operators.