Dns Rebinding in Flask
How Dns Rebinding Manifests in Flask
Dns Rebinding attacks exploit the gap between DNS resolution and security policies in Flask applications. When a Flask app accepts a user-supplied hostname and performs DNS resolution without proper validation, attackers can manipulate DNS records to bypass IP-based security controls.
The attack works by registering a domain that resolves to the attacker's IP initially, then rapidly switching the DNS record to point to the victim's internal network. Flask applications that perform outbound requests to user-supplied domains without validating the final resolved IP are vulnerable.
Consider this Flask endpoint that accepts a webhook URL:
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
@app.route('/setup-webhook', methods=['POST'])
def setup_webhook():
data = request.json
url = data['webhook_url']
response = requests.get(url) # Vulnerable to DNS rebinding
return jsonify(success=True)An attacker crafts a URL like http://attacker-domain.com where the DNS initially resolves to their public IP. The Flask app connects successfully. Then the attacker changes the DNS record to point to 192.168.1.1 (the victim's gateway). When the Flask app retries or refreshes, it now connects to the internal network.
Another common pattern in Flask is accepting IP addresses for API endpoints:
@app.route('/api/proxy', methods=['POST'])
def proxy_request():
target_ip = request.json['target_ip']
port = request.json.get('port', 80)
url = f'http://{target_ip}:{port}'
response = requests.get(url) # No IP validation
return jsonify(response.json())This allows direct internal network access if the attacker discovers internal IPs through other means.
Flask applications that implement internal service discovery are particularly vulnerable. A typical pattern:
def discover_service(service_name):
# Internal DNS lookup
return socket.gethostbyname(service_name + '.internal')If an attacker registers malicious.internal and points it to their IP, then the Flask app performs DNS resolution and connects to the attacker-controlled service.
Flask-Specific Detection
Detecting DNS rebinding in Flask requires examining how your application handles network requests and DNS resolution. The middleBrick API security scanner specifically tests for this vulnerability by attempting DNS rebinding attacks against your endpoints.
middleBrick's approach includes:
- Scanning endpoints that accept URLs or hostnames in request parameters
- Testing with domains that resolve to public IPs, then rapidly changing to private IP ranges
- Checking if the application follows redirects or retries requests
- Analyzing response patterns to detect successful internal network access
For manual detection in Flask, examine your codebase for these patterns:
import re
def scan_for_dns_rebinding_vulns(app):
vulnerable_patterns = [
r'requests.get\s*\([^)]*\b(url|hostname|domain|host)\b[^)]*\)',
r'socket\.gethostbyname\s*\([^)]*\b(host|domain|url)\b[^)]*\)',
r'urllib\.request\.urlopen\s*\([^)]*\b(url|hostname)\b[^)]*\)',
]
source_code = inspect.getsource(app)
matches = []
for pattern in vulnerable_patterns:
matches.extend(re.findall(pattern, source_code))
return matchesmiddleBrick's LLM security module also detects if your Flask app serves AI/ML endpoints that might be vulnerable to DNS rebinding when making external API calls to model providers or data sources.
Key indicators to search for in your Flask application:
- Direct use of
requests.get(),requests.post()with user-supplied URLs - Dynamic URL construction without validation
- Socket-level operations on user input
- External API calls without IP whitelisting
Flask-Specific Remediation
Securing Flask against DNS rebinding requires implementing IP validation and request controls. Here are Flask-specific remediation strategies:
First, implement IP address validation before making outbound requests:
from flask import Flask, request, jsonify
import requests
import socket
import ipaddress
app = Flask(__name__)
# Define trusted IP ranges
TRUSTED_IPS = [
ipaddress.ip_network('203.0.113.0/24'), # Your API provider
ipaddress.ip_network('198.51.100.0/24'), # Another trusted service
]
def is_trusted_ip(ip_str):
try:
ip = ipaddress.ip_address(ip_str)
for network in TRUSTED_IPS:
if ip in network:
return True
return False
except ValueError:
return False
@app.route('/setup-webhook', methods=['POST'])
def setup_webhook():
data = request.json
url = data['webhook_url']
# Extract hostname and resolve
hostname = requests.utils.urlparse(url).hostname
try:
ips = socket.gethostbyname_ex(hostname)[2]
if not any(is_trusted_ip(ip) for ip in ips):
return jsonify(error='Untrusted IP address'), 400
except socket.gaierror:
return jsonify(error='Invalid hostname'), 400
response = requests.get(url, timeout=10)
return jsonify(success=True)For Flask applications using SQLAlchemy or other database connections, ensure connection strings don't accept user input:
from flask import current_app
from sqlalchemy import create_engine
def get_database_engine():
# Never construct from user input
db_config = current_app.config['DATABASE']
connection_string = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
return create_engine(connection_string)Implement request timeouts and limits to reduce attack window:
from flask import Blueprint
import requests
api = Blueprint('api', __name__)
@api.route('/proxy', methods=['POST'])
def proxy():
data = request.json
url = data['url']
# Validate URL scheme
if not url.startswith(('http://', 'https://')):
return jsonify(error='Invalid URL scheme'), 400
try:
response = requests.get(
url,
timeout=(3.05, 27), # Connect timeout, read timeout
allow_redirects=False,
headers={'User-Agent': 'middleBrick-Security-Check/1.0'}
)
return jsonify({
'status_code': response.status_code,
'content': response.text[:1000] # Limit response size
})
except requests.exceptions.Timeout:
return jsonify(error='Request timeout'), 504
except requests.exceptions.RequestException:
return jsonify(error='Request failed'), 500Consider using Flask's before_request decorators for global protection:
@app.before_request
def validate_external_requests():
if request.endpoint in ['vulnerable_endpoint']:
# Apply additional validation for specific routes
data = request.get_json(silent=True) or {}
if 'external_url' in data:
url = data['external_url']
hostname = requests.utils.urlparse(url).hostname
if not is_allowed_hostname(hostname):
return jsonify(error='External requests not allowed'), 403