Cache Poisoning in Flask
How Cache Poisoning Manifests in Flask
Cache poisoning in Flask applications occurs when malicious data is injected into the cache, causing subsequent requests to receive corrupted or manipulated responses. This vulnerability is particularly dangerous because it can affect multiple users without requiring authentication.
In Flask, cache poisoning typically manifests through several attack vectors:
Query Parameter Manipulation: Attackers can exploit how Flask applications generate cache keys. Consider this vulnerable pattern:
@app.route('/api/user')
def get_user():
user_id = request.args.get('id', '1')
cache_key = f"user_{user_id}"
cached_data = cache.get(cache_key)
if cached_data:
return cached_data
user = db.get_user(user_id)
response = jsonify(user)
cache.set(cache_key, response.data, timeout=300)
return responseAn attacker can craft requests like /api/user?id=1%0A or use special characters that create cache collisions, causing legitimate users to receive poisoned responses.
Header-Based Cache Poisoning: Flask applications often use request headers for caching decisions:
@app.route('/api/data')
def get_data():
accept = request.headers.get('Accept', 'application/json')
cache_key = f"data_{accept}"
cached = cache.get(cache_key)
if cached:
return cached
data = fetch_data()
cache.set(cache_key, data, timeout=60)
return dataAn attacker can send requests with crafted Accept headers that cause the application to cache malicious content under legitimate cache keys.
Authentication Bypass via Cache Poisoning: In Flask applications using Flask-Login or similar libraries:
@app.route('/api/profile')
def get_profile():
user_id = session.get('user_id', 'guest')
cache_key = f"profile_{user_id}"
cached = cache.get(cache_key)
if cached:
return cached
profile = db.get_profile(user_id)
cache.set(cache_key, profile, timeout=600)
return profileIf an attacker can manipulate the session or force a cache hit for a privileged user's profile, they can access unauthorized data.
Response Header Manipulation: Flask's response objects can be manipulated to poison cache metadata:
@app.route('/api/unsafe')
def unsafe_endpoint():
response = make_response(jsonify({'data': 'safe'}))
response.headers['X-Custom-Header'] = request.args.get('header', '')
cache.set('unsafe_response', response.get_data(), timeout=300)
return responseAttackers can inject headers that affect how browsers or proxies interpret cached responses.
Flask-Specific Detection
Detecting cache poisoning in Flask applications requires examining both code patterns and runtime behavior. Here's how to identify these vulnerabilities:
Static Code Analysis: Look for these Flask-specific patterns:
# Vulnerable pattern - using unsanitized request data in cache keys
@cache.memoize()
def vulnerable_function(user_input):
return process_data(user_input)Scan for cache operations that use request parameters, headers, or session data without proper validation.
middleBrick API Scanning: The middleBrick scanner specifically tests for cache poisoning by:
- Analyzing cache key generation patterns in your Flask routes
- Testing for header-based cache manipulation
- Checking for improper session handling in cached responses
- Examining how your application handles special characters in cache keys
- Testing for response splitting vulnerabilities that can poison cache metadata
middleBrick's black-box scanning approach tests your Flask application's unauthenticated attack surface without requiring credentials or code access.
Runtime Detection: Implement logging to detect cache poisoning attempts:
from flask import request
import re
CACHE_POISONING_PATTERNS = [
re.compile(r'[
]+'), # CRLF injection
re.compile(r'[
]'), # Unicode line breaks
re.compile(r'[--]'), # Unicode control chars
]Add middleware to log suspicious cache key patterns:
class CachePoisoningDetector:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
path = environ.get('PATH_INFO', '')
query = environ.get('QUERY_STRING', '')
# Check for suspicious patterns
for pattern in CACHE_POISONING_PATTERNS:
if pattern.search(path) or pattern.search(query):
app.logger.warning(
f"Suspicious cache key pattern detected: {path}?{query}"
)
return self.app(environ, start_response)Cache Key Analysis: Examine how your Flask application generates cache keys:
def analyze_cache_keys(app):
vulnerable_routes = []
for rule in app.url_map.iter_rules():
if 'GET' in rule.methods:
# Check if route uses request data in cache operations
if any(param in rule.rule for param in ['<', '>']):
vulnerable_routes.append(rule.rule)
return vulnerable_routesFlask-Specific Remediation
Securing Flask applications against cache poisoning requires a multi-layered approach. Here are Flask-specific remediation strategies:
Input Validation and Sanitization: Always validate cache key inputs:
from markupsafe import escape
import re
def sanitize_cache_key(input_str):
# Remove dangerous characters
sanitized = re.sub(r'[\r\n\t\x00-\x1f\x7f-\x9f]', '', input_str)
# Limit length
sanitized = sanitized[:100]
# URL encode
return escape(sanitized)Use this in your Flask routes:
@app.route('/api/user')
def get_user():
user_id = request.args.get('id', '1')
safe_id = sanitize_cache_key(user_id)
cache_key = f"user_{safe_id}"
cached_data = cache.get(cache_key)
if cached_data:
return cached_data
user = db.get_user(user_id)
response = jsonify(user)
cache.set(cache_key, response.data, timeout=300)
return responseContext-Aware Caching: Use Flask's context to create safe cache keys:
from flask import g
def create_safe_cache_key(base_key, request_data=None):
context = {
'user_agent': request.headers.get('User-Agent', 'unknown')[:50],
'client_ip': request.remote_addr,
'endpoint': request.endpoint
}
if request_data:
context.update(request_data)
# Create deterministic key
key_parts = [base_key]
for k, v in sorted(context.items()):
key_parts.append(f"{k}:{str(v)[:50]}")
return ":".join(key_parts)Secure Cache Libraries: Use Flask-Caching with proper configuration:
from flask_caching import Cache
cache = Cache(config={
'CACHE_TYPE': 'redis',
'CACHE_REDIS_URL': 'redis://localhost:6379',
'CACHE_DEFAULT_TIMEOUT': 300,
'CACHE_KEY_PREFIX': 'flask_app_',
'CACHE_OPTIONS': {
'socket_connect_timeout': 5,
'socket_timeout': 5
}
})Response Validation: Validate responses before caching:
def validate_before_cache(response, max_size=1024*1024):
# Check content type
if 'content-type' not in response.headers:
raise ValueError("Missing Content-Type header")
# Check for dangerous content
content = response.get_data(as_text=True)
if len(content) > max_size:
raise ValueError("Response too large for cache")
# Check for script tags or dangerous content
if '<script>' in content or 'javascript:' in content.lower():
raise ValueError("Suspicious content detected")
return responseCache Invalidation Strategy: Implement proper cache invalidation:
class SafeCacheManager:
def __init__(self, cache):
self.cache = cache
self.version = 1
def get_versioned_key(self, base_key):
return f"{base_key}_v{self.version}"
def invalidate_all(self):
self.version += 1
app.logger.info(f"Cache invalidated, new version: {self.version}")Testing Cache Security: Add security tests to your Flask test suite:
import pytest
from your_app import create_app
def test_cache_poisoning_protection():
app = create_app()
with app.test_client() as client:
# Test with malicious input
response = client.get('/api/user?id=1%0A')
assert response.status_code == 200
assert '<script>' not in response.get_data(as_text=True)
# Test cache hit with safe data
safe_response = client.get('/api/user?id=1')
assert safe_response.status_code == 200
# Verify cache keys are sanitized
cache = app.config['CACHE']
keys = cache.cache._cache.keys()
for key in keys:
assert '\n' not in key
assert '\r' not in key