Ldap Injection in Fastapi
How Ldap Injection Manifests in Fastapi
LDAP injection in Fastapi applications typically occurs when user input is directly incorporated into LDAP queries without proper sanitization. Fastapi's async nature and common authentication patterns create specific vulnerabilities that attackers can exploit.
The most common scenario involves Fastapi's dependency injection system combined with LDAP-based authentication. Consider this vulnerable pattern:
from fastapi import FastAPI, Depends, HTTPException
from ldap3 import Server, Connection, ALL
app = FastAPI()
def get_ldap_connection():
server = Server('ldap://your-ldap-server.com', get_info=ALL)
conn = Connection(server, user='cn=admin,dc=example,dc=com', password='admin_password')
if not conn.bind():
raise HTTPException(status_code=500, detail='LDAP connection failed')
return conn
@app.post('/login')
async def login(username: str, password: str, conn: Connection = Depends(get_ldap_connection)):
search_filter = f'(uid={username})'
conn.search(search_base='dc=example,dc=com', search_filter=search_filter)
if not conn.entries:
raise HTTPException(status_code=401, detail='Invalid credentials')
user_dn = conn.entries[0].entry_dn
return conn.rebind(user=user_dn, password=password)
The vulnerability lies in the search_filter = f'(uid={username})' line. An attacker can inject LDAP filter operators like *, |, or & to bypass authentication or extract sensitive directory information.
Common Fastapi LDAP injection payloads:
username=admin*)(|(uid=*- bypasses authentication by creating an always-true filterusername=admin*)(!uid=*- excludes valid usersusername=admin*)(uid=admin*- wildcard search for admin accountsusername=admin*)(objectClass=*- retrieves all directory entriesusername=admin*)(g=*- causes LDAP server errors revealing internal structure
Fastapi's async dependency injection can exacerbate these issues. If the LDAP connection is shared across requests without proper isolation, an attacker might cause race conditions or timing attacks to extract information about valid usernames.
Another Fastapi-specific pattern involves using Pydantic models for input validation, which can create a false sense of security:
from pydantic import BaseModel
class LoginRequest(BaseModel):
username: str
password: str
@app.post('/login')
async def login(login_data: LoginRequest, conn: Connection = Depends(get_ldap_connection)):
# Still vulnerable despite Pydantic validation
search_filter = f'(uid={login_data.username})'
# ... rest of vulnerable code
Pydantic validates type and format but doesn't sanitize LDAP injection characters, leaving the application vulnerable.
Fastapi-Specific Detection
Detecting LDAP injection in Fastapi requires both static code analysis and runtime scanning. middleBrick's API security scanner includes specialized checks for LDAP vulnerabilities in Fastapi applications.
middleBrick's LDAP injection detection for Fastapi applications includes:
- Pattern matching for LDAP query construction in Fastapi route handlers
- Detection of direct string interpolation in search filters
- Analysis of dependency injection patterns that might expose LDAP connections
- Scanning for common LDAP injection payloads in request parameters
- Checking for improper error handling that might leak directory information
The scanner tests your Fastapi endpoints with a battery of LDAP injection payloads, monitoring for:
- Authentication bypass (returning success with invalid credentials)
- Information disclosure (revealing valid usernames or directory structure)
- Server errors that expose LDAP schema or configuration
- Timing differences that indicate successful injection
middleBrick's LLM/AI security module also checks for AI-specific LDAP injection scenarios, such as when Fastapi applications use AI models to generate LDAP queries or when AI-generated code contains LDAP injection vulnerabilities.
For manual testing, you can use tools like Burp Suite or OWASP ZAP with these Fastapi-specific LDAP payloads:
# Test authentication bypass
curl -X POST 'https://your-fastapi-app.com/login' \
-H 'Content-Type: application/json' \
-d '{"username": "admin*)(|(uid=*&password="invalid"}'middleBrick's CLI tool provides Fastapi-specific reporting:
middlebrick scan https://your-fastapi-app.com --fastapi --ldap
# Or integrate into Fastapi development workflow
middlebrick scan http://localhost:8000 --fastapi --ldap --fail-on-high-risk
The scanner's OpenAPI analysis feature examines your Fastapi's auto-generated OpenAPI spec to identify endpoints that handle user input potentially used in LDAP queries, then correlates this with runtime scanning results.
Fastapi-Specific Remediation
Remediating LDAP injection in Fastapi requires a defense-in-depth approach using Fastapi's native features and proper LDAP query construction techniques.
The most effective approach uses parameterized LDAP queries with Fastapi's dependency injection:
from fastapi import FastAPI, Depends, HTTPException
from ldap3 import Server, Connection, ALL, SUBTREE, NTLM
from ldap3.core.exceptions import LDAPException
from ldap3.utils.conv import escape_filter_chars
app = FastAPI()
class LDAPConnection:
def __init__(self):
self.server = Server('ldap://your-ldap-server.com', get_info=ALL)
self.admin_conn = None
async def __aenter__(self):
self.admin_conn = Connection(self.server, user='cn=admin,dc=example,dc=com', password='admin_password', authentication=NTLM)
if not self.admin_conn.bind():
raise HTTPException(status_code=500, detail='LDAP connection failed')
return self
async def __aexit__(self, exc_type, exc, tb):
if self.admin_conn:
self.admin_conn.unbind()
def search_user(self, username: str):
# Critical: escape LDAP filter characters
safe_username = escape_filter_chars(username)
search_filter = f'(uid={safe_username})'
try:
self.admin_conn.search(
search_base='dc=example,dc=com',
search_filter=search_filter,
search_scope=SUBTREE,
attributes=['dn', 'uid', 'cn', 'mail']
)
return self.admin_conn.entries
except LDAPException as e:
raise HTTPException(status_code=500, detail='LDAP search failed')
@app.post('/login')
async def login(username: str, password: str, ldap: LDAPConnection = Depends()):
entries = ldap.search_user(username)
if not entries:
raise HTTPException(status_code=401, detail='Invalid credentials')
user_dn = entries[0].entry_dn
try:
user_conn = Connection(ldap.server, user=user_dn, password=password, authentication=NTLM)
if not user_conn.bind():
raise HTTPException(status_code=401, detail='Invalid credentials')
return {'message': 'Login successful'}
except LDAPException:
raise HTTPException(status_code=401, detail='Invalid credentials')
Key Fastapi-specific remediation techniques:
- Use context managers: The
LDAPConnectionclass uses async context management to ensure connections are properly closed, preventing resource exhaustion attacks. - Escape filter characters: Always use
escape_filter_chars()fromldap3.utils.convon user input before incorporating it into LDAP filters. - Implement proper error handling: Don't reveal whether a username exists or provide detailed LDAP error messages to attackers.
- Use dependency injection: Fastapi's dependency system ensures proper connection lifecycle management.
- Add rate limiting: Use Fastapi's rate limiting middleware to prevent brute-force LDAP injection attempts.
Additional Fastapi-specific security measures:
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
app = FastAPI()
# Add trusted host middleware
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=['your-domain.com', 'localhost']
)
# Add rate limiting
limiter = Limiter(key_func=get_remote_address)
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.state.limiter = limiter
@app.post('/login')
@limiter.limit('5/minute')
async def login(username: str, password: str, ldap: LDAPConnection = Depends()):
# ... secure LDAP code
pass
For comprehensive protection, integrate middleBrick's continuous monitoring into your Fastapi CI/CD pipeline:
# .github/workflows/security.yml
name: Security Scan
on: [push, pull_request]
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run middleBrick LDAP injection scan
run: |
npm install -g middlebrick
middlebrick scan ${{ secrets.API_URL }} \
--fastapi \
--ldap \
--fail-on-high-risk \
--output json > security-report.json
- name: Upload report
uses: actions/upload-artifact@v3
with:
name: security-report
path: security-report.json