Sql Injection in Flask
How Sql Injection Manifests in Flask
Sql Injection in Flask applications typically occurs when database queries are constructed using string concatenation or Python string formatting rather than parameterized queries. Flask's simplicity and flexibility can inadvertently lead developers to write vulnerable code, especially when handling user input from forms, URL parameters, or JSON payloads.
The most common Flask-specific pattern involves using string formatting directly in SQLAlchemy queries:
from flask import Flask, request
from sqlalchemy import create_engine
app = Flask(__name__)
engine = create_engine('sqlite:///example.db')
@app.route('/search')
def search():
username = request.args.get('username')
query = f"SELECT * FROM users WHERE username = '{username}'"
with engine.connect() as conn:
result = conn.execute(query)
return str(list(result))
This code is vulnerable because an attacker can manipulate the username parameter to inject arbitrary SQL. For example, a request to /search?username=admin' OR '1'='1 would return all users instead of just the admin user.
Another Flask-specific vulnerability arises when using raw SQL with Flask-SQLAlchemy's text() function without proper parameterization:
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
db = SQLAlchemy(app)
@app.route('/profile')
def profile():
user_id = request.args.get('id')
query = text(f"SELECT * FROM users WHERE id = {user_id}")
result = db.engine.execute(query)
return str(result.fetchone())
Dynamic table or column names in Flask applications also create injection opportunities:
@app.route('/data')
def data():
table = request.args.get('table')
column = request.args.get('column')
query = f"SELECT {column} FROM {table} WHERE active = 1"
result = db.engine.execute(query)
return jsonify([dict(row) for row in result])
This pattern is particularly dangerous because attackers can access any table or column in your database schema.
Flask-Specific Detection
Detecting SQL injection vulnerabilities in Flask requires examining how database queries are constructed throughout your application. middleBrick's black-box scanning approach tests your Flask API endpoints without requiring source code access or credentials.
When middleBrick scans a Flask endpoint, it automatically tests for SQL injection by submitting payloads that attempt to manipulate query structure. For the vulnerable search endpoint example above, middleBrick would test payloads like:
# Test for boolean-based injection
/search?username=admin' OR '1'='1
# Test for error-based injection
/search?username=1' UNION SELECT 1,2,3--
# Test for time-based injection
/search?username=1' WAITFOR DELAY '0:0:5'--
middleBrick analyzes the HTTP responses for indicators of successful injection, such as:
- Different response structures or content when injecting boolean payloads
- SQL error messages in the response body
- Significant response time delays with time-based payloads
- Unexpected data exposure in the response
For Flask applications using SQLAlchemy, middleBrick's spec analysis feature can cross-reference your OpenAPI/Swagger definitions with runtime findings. If your spec shows a parameter that's used in database queries, middleBrick will specifically target that parameter with SQL injection payloads.
The scanner also detects Flask-specific patterns like:
- Endpoints using
request.args,request.form, orrequest.jsonfor database queries - Dynamic SQL construction in route handlers
- Raw SQL execution with
text()or direct connection objects - Endpoints that accept table or column names as parameters
middleBrick's LLM security module adds another layer of detection for Flask applications that use AI features. If your Flask app has an LLM endpoint that processes user input and stores it in a database, middleBrick tests for prompt injection that could lead to SQL injection through the AI's output.
Flask-Specific Remediation
Remediating SQL injection in Flask applications requires using parameterized queries and avoiding dynamic SQL construction. Flask-SQLAlchemy provides several secure patterns that prevent injection:
from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
@app.route('/search')
def search():
username = request.args.get('username')
# Secure: parameterized query using SQLAlchemy ORM
user = User.query.filter_by(username=username).first()
return jsonify(user.to_dict()) if user else ('Not found', 404)
@app.route('/profile')
def profile():
user_id = request.args.get('id')
# Secure: parameterized query using filter
user = User.query.filter(User.id == user_id).first()
return jsonify(user.to_dict()) if user else ('Not found', 404)
For raw SQL queries, always use parameterized statements:
from sqlalchemy import text
@app.route('/raw_search')
def raw_search():
username = request.args.get('username')
# Secure: parameterized query with text()
query = text("SELECT * FROM users WHERE username = :username")
result = db.engine.execute(query, {'username': username})
return jsonify([dict(row) for row in result])
When you need dynamic table or column names (though this should be avoided when possible), validate against a whitelist:
VALID_TABLES = {'users', 'orders', 'products'}
VALID_COLUMNS = {'id', 'username', 'email', 'created_at'}
@app.route('/data')
def data():
table = request.args.get('table')
column = request.args.get('column')
if table not in VALID_TABLES or column not in VALID_COLUMNS:
return 'Invalid table or column', 400
# Still use parameterization for values, but validate identifiers
query = text(f"SELECT {column} FROM {table} WHERE active = :active")
result = db.engine.execute(query, {'active': 1})
return jsonify([dict(row) for row in result])
For complex queries, use SQLAlchemy's expression language:
from sqlalchemy import and_
@app.route('/advanced_search')
def advanced_search():
username = request.args.get('username')
email = request.args.get('email')
filters = []
if username:
filters.append(User.username == username)
if email:
filters.append(User.email == email)
query = User.query.filter(and_(*filters))
results = query.all()
return jsonify([user.to_dict() for user in results])
Flask-WTF forms provide additional security by validating and sanitizing input before it reaches your database queries:
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms.validators import DataRequired
class SearchForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
submit = SubmitField('Search')
@app.route('/search', methods=['GET', 'POST'])
def search():
form = SearchForm()
if form.validate_on_submit():
username = form.username.data
user = User.query.filter_by(username=username).first()
return jsonify(user.to_dict()) if user else ('Not found', 404)
return 'Invalid input', 400
middleBrick's GitHub Action can automatically scan your Flask endpoints during CI/CD, ensuring SQL injection vulnerabilities are caught before deployment. The action can be configured to fail builds if critical vulnerabilities are detected:
- name: Scan API Security
uses: middlebrick/middlebrick-action@v1
with:
api_url: http://localhost:5000
fail_on_critical: true
fail_on_high: true
Related CWEs: inputValidation
| CWE ID | Name | Severity |
|---|---|---|
| CWE-20 | Improper Input Validation | HIGH |
| CWE-22 | Path Traversal | HIGH |
| CWE-74 | Injection | CRITICAL |
| CWE-77 | Command Injection | CRITICAL |
| CWE-78 | OS Command Injection | CRITICAL |
| CWE-79 | Cross-site Scripting (XSS) | HIGH |
| CWE-89 | SQL Injection | CRITICAL |
| CWE-90 | LDAP Injection | HIGH |
| CWE-91 | XML Injection | HIGH |
| CWE-94 | Code Injection | CRITICAL |