Brute Force Attack in Fastapi with Cockroachdb
Brute Force Attack in Fastapi with Cockroachdb — how this combination creates or exposes the vulnerability
A brute force attack against a Fastapi service backed by Cockroachdb typically exploits weak authentication endpoints or account enumeration issues. In this stack, attackers may send many credential guesses to the login route, relying on the absence of rate limiting or adaptive throttling. Cockroachdb, as a distributed SQL database, stores user credentials and session data; if Fastapi does not enforce strict lockout policies or constant-time comparison, attackers can infer valid usernames by observing differences in response behavior or timing.
The 12 security checks in middleBrick run in parallel and specifically evaluate Authentication and Rate Limiting for such scenarios. For example, without per-account rate limiting, an unauthenticated attacker can probe many usernames and passwords against the Fastapi endpoint. If the API returns distinct error messages for missing users versus incorrect passwords, this account enumeration aids iterative guessing. Cockroachdb’s consistency guarantees do not prevent this; the risk lies in how Fastapi constructs queries and handles failures. Queries like SELECT id, password_hash FROM users WHERE username = $1 become dangerous when the surrounding Fastapi logic leaks existence via status codes or response content. MiddleBrick’s Authentication check flags missing controls, while Rate Limiting checks identify insufficient request throttling.
Another angle involves IDOR/BOLA when object-level permissions are not enforced. If a Fastapi route accepts a user identifier and queries Cockroachdb without verifying that the authenticated subject owns that identifier, attackers can iterate through numeric or UUID identifiers to access or modify other users’ data. This is distinct from pure credential brute force but often coexists in APIs with weak authorization. The BOLA/IDOR check in middleBrick validates that references are scoped correctly and that tenant boundaries are enforced in queries. Data Exposure and Encryption checks further ensure that credentials are never returned in logs or error responses and that TLS is correctly negotiated between Fastapi and Cockroachdb.
Because middleBrick scans the unauthenticated attack surface and supports OpenAPI/Swagger spec analysis with full $ref resolution, it can correlate declared authentication schemes with observed runtime behavior. If the spec lacks security schemes or the implementation deviates, findings highlight gaps that facilitate brute force or enumeration. Instrumenting your API with middleBrick helps detect these patterns before attackers succeed, providing prioritized findings with severity and remediation guidance aligned to frameworks like OWASP API Top 10 and PCI-DSS.
Cockroachdb-Specific Remediation in Fastapi — concrete code fixes
Remediation focuses on reducing information leakage, enforcing rate limits, and validating ownership for every database operation. Below are concrete Fastapi patterns with Cockroachdb that address brute force risks.
- Use constant-time comparison and generic error messages to prevent account enumeration:
from fastapi import FastAPI, HTTPException, Depends, status
from pydantic import BaseModel
import bcrypt
import secrets
app = FastAPI()
class LoginRequest(BaseModel):
username: str
password: str
def get_user(username: str):
# Example using Cockroachdb with psycopg3 connection
import psycopg
conn = psycopg.connect("postgresql://user:pass@host:26257/db?sslmode=require")
with conn.cursor() as cur:
cur.execute("SELECT id, password_hash FROM users WHERE username = $1", (username,))
row = cur.fetchone()
conn.close()
return row
@app.post("/login")
async def login(payload: LoginRequest):
row = get_user(payload.username)
# Always run hash comparison to avoid timing leaks
stored_hash = row[1] if row else b"$2b$12$dummy"
# Use secrets.compare_digest for constant-time check
if not secrets.compare_digest(stored_hash, bcrypt.hashpw(payload.password.encode(), stored_hash)):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
# On success, return a token (implementation omitted)
return {"token": "example-jwt-token"}
- Apply per-user and global rate limiting with slow-start behavior:
from fastapi import FastAPI, Request, HTTPException, status
from fastapi.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
import time
import psycopg
app = FastAPI()
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, max_attempts=5, window=60):
super().__init__(app)
self.max_attempts = max_attempts
self.window = window
# Use Cockroachdb to store attempt counters with TTL-like cleanup
async def dispatch(self, request: Request, call_next):
if request.url.path == "/login":
ip = request.client.host
username = request.form().get("username") if request.method == "POST" else "unknown"
key = f"rate:{ip}:{username}"
conn = psycopg.connect("postgresql://user:pass@host:26257/db?sslmode=require")
with conn.cursor() as cur:
cur.execute(
"INSERT INTO rate_log (key, count, created_at) VALUES ($1, 1, now()) ON CONFLICT (key) DO UPDATE SET count = rate_log.count + 1, created_at = now() WHERE rate_log.created_at > NOW() - INTERVAL '1 minute'",
(key,)
)
cur.execute("SELECT count FROM rate_log WHERE key = $1 AND created_at > NOW() - INTERVAL '1 minute'", (key,))
row = cur.fetchone()
conn.close()
if row and row[0] > self.max_attempts:
raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many attempts")
response = await call_next(request)
return response
app.add_middleware(RateLimitMiddleware)
- Enforce ownership checks with Cockroachdb row-level security patterns in Fastapi routes:
from fastapi import Depends, HTTPException, status
import psycopg
def get_db():
conn = psycopg.connect("postgresql://user:pass@host:26257/db?sslmode=require")
return conn
def ensure_own_profile(request, db=Depends(get_db)):
# Assuming user identity is resolved earlier and attached to request state
target_id = request.path_params.get("user_id")
subject_id = request.state.user_id # from auth dependency
with db.cursor() as cur:
cur.execute("SELECT 1 FROM profiles WHERE id = $1 AND owner_id = $2", (target_id, subject_id))
if not cur.fetchone():
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")
return db
@app.get("/profiles/{user_id}")
def read_profile(user_id: str, db: psycopg.Connection = Depends(get_db), _=Depends(ensure_own_profile)):
with db.cursor() as cur:
cur.execute("SELECT display_name FROM profiles WHERE id = $1 AND owner_id = $2", (user_id, request.state.user_id))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Not found")
return {"display_name": row[0]}
These examples show how to combine Fastapi application logic with Cockroachdb queries to mitigate brute force risks. MiddleBrick’s checks for Authentication, Rate Limiting, BOLA/IDOR, Data Exposure, and Encryption help validate that such controls are present and correctly observed during scanning.