Padding Oracle in Flask with Cockroachdb
Padding Oracle in Flask with Cockroachdb — how this specific combination creates or exposes the vulnerability
A padding oracle attack occurs when an application reveals whether decrypted ciphertext has valid padding, allowing an attacker to iteratively decrypt or forge messages without knowing the key. In a Flask application using Cockroachdb as the backend, this risk arises when encrypted data is stored in Cockroachdb and later decrypted in Flask with error messages that indirectly disclose padding validity.
Consider a Flask route that fetches an encrypted record from Cockroachdb by an ID, decrypts it using AES in CBC mode, and returns different HTTP statuses or messages depending on whether decryption succeeds or raises a padding error:
import base64
from flask import Flask, request, jsonify
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import os
app = Flask(__name__)
KEY = bytes(range(32)) # example key; in practice use a secure key store
def decrypt_pkcs7(ciphertext: bytes, key: bytes) -> bytes:
iv = ciphertext[:16]
ct = ciphertext[16:]
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(ct) + decryptor.finalize()
# This step may raise ValueError if padding is invalid
from cryptography.hazmat.primitives.padding import unpad
return unpad(padded_plaintext, 16)
@app.route("/record/")
def get_record(record_id):
import psycopg2
conn = psycopg2.connect("dbname=cockroachdb user=app host=localhost sslmode=require")
cur = conn.cursor()
cur.execute("SELECT encrypted_data FROM records WHERE id = %s;", (record_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return jsonify({"error": "not found"}), 404
enc = base64.b64decode(row[0])
try:
plain = decrypt_pkcs7(enc, KEY)
return jsonify({"data": plain.decode()})
except ValueError as e:
# Returning 400 for invalid padding leaks padding oracle information
return jsonify({"error": "decryption failed", "details": str(e)}), 400
If the unpad call raises ValueError for bad padding and Flask returns a 400 with a descriptive message, an attacker can distinguish valid padding from invalid padding. Over many requests, this enables byte-at-a-time decryption of ciphertext stored in Cockroachdb, especially when the same key is used across records or when records include structured, predictable plaintext (e.g., JSON with known fields). The presence of Cockroachdb does not cause the oracle, but storing and retrieving encrypted blobs from Cockroachdb and exposing padding errors in Flask creates a practical attack surface.
Additional risk patterns include using deterministic encryption or misusing nonces, which can make ciphertexts malleable or predictable. SSRF or unsafe consumption issues in Flask might allow an attacker to coerce the server to submit crafted ciphertexts to the same decryption routine, amplifying the oracle. Because middleBrick tests input validation and unsafe consumption as part of its 12 checks, it can detect endpoints that expose padding-related errors or accept attacker-controlled ciphertexts.
Cockroachdb-Specific Remediation in Flask — concrete code fixes
Remediation focuses on ensuring decryption does not leak padding validity and that data stored in Cockroachdb is handled safely. Use authenticated encryption (e.g., AES-GCM) instead of raw AES-CBC to provide integrity and avoid padding entirely. If CBC is required, use a constant-time unpad and return a uniform error response regardless of padding or decryption failures.
Below is a revised Flask pattern that stores and retrieves encrypted data in Cockroachdb using AES-GCM, avoiding padding issues:
import base64
import os
from flask import Flask, request, jsonify
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import psycopg2
app = Flask(__name__)
KEY = AESGCM.generate_key(bit_length=256)
# Initialize table in Cockroachdb (run once)
def init_db():
conn = psycopg2.connect("dbname=cockroachdb user=app host=localhost sslmode=require")
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS records (
id TEXT PRIMARY KEY,
encrypted_data BYTEA NOT NULL
)
""")
conn.commit()
cur.close()
conn.close()
init_db()
def encrypt_aes_gcm(plaintext: bytes, key: bytes) -> str:
aesgcm = AESGCM(key)
nonce = os.urandom(12)
ct = aesgcm.encrypt(nonce, plaintext, associated_data=None)
# store nonce + ciphertext; base64-encode for Cockroachdb BYTEA convenience
return base64.b64encode(nonce + ct).decode()
def decrypt_aes_gcm(b64_blob: str, key: bytes) -> bytes:
aesgcm = AESGCM(key)
blob = base64.b64decode(b64_blob)
nonce = blob[:12]
ct = blob[12:]
return aesgcm.decrypt(nonce, ct, associated_data=None)
@app.route("/record/")
def get_record(record_id):
conn = psycopg2.connect("dbname=cockroachdb user=app host=localhost sslmode=require")
cur = conn.cursor()
cur.execute("SELECT encrypted_data FROM records WHERE id = %s;", (record_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return jsonify({"error": "not found"}), 404
try:
plain = decrypt_aes_gcm(row[0], KEY)
return jsonify({"data": plain.decode()})
except Exception:
# Do not distinguish between padding, authentication, or decoding failures
return jsonify({"error": "decryption failed"}), 400
If AES-CBC is required, ensure you use a constant-time unpad and return a generic error:
import hmac
from cryptography.hazmat.primitives import padding
def decrypt_pkcs7_constant_time(ciphertext: bytes, key: bytes) -> bytes:
iv = ciphertext[:16]
ct = ciphertext[16:]
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(ct) + decryptor.finalize()
try:
unpadder = padding.PKCS7(128).unpadder()
return unpadder.update(padded_plaintext) + unpadder.finalize()
except Exception:
# Always return a generic error to avoid leaking padding info
raise ValueError("decryption failed")
@app.route("/record-cbc/")
def get_record_cbc(record_id):
conn = psycopg2.connect("dbname=cockroachdb user=app host=localhost sslmode=require")
cur = conn.cursor()
cur.execute("SELECT encrypted_data FROM records WHERE id = %s;", (record_id,))
row = cur.fetchone()
cur.close()
conn.close()
if row is None:
return jsonify({"error": "not found"}), 404
try:
plain = decrypt_pkcs7_constant_time(row[0], KEY)
return jsonify({"data": plain.decode()})
except ValueError:
return jsonify({"error": "decryption failed"}), 400
Additional recommendations: enforce integrity checks (HMAC or AEAD), rotate keys securely, and use HTTPS to prevent ciphertext tampering. middleBrick’s Continuous Monitoring and CI/CD integration in the Pro plan can help detect endpoints that expose error details indicative of padding oracles.