Api Rate Abuse in Fastapi with Dynamodb
Api Rate Abuse in Fastapi with Dynamodb — how this specific combination creates or exposes the vulnerability
Rate abuse in a FastAPI service backed by DynamoDB typically occurs when an API lacks effective rate limiting, allowing an attacker to issue a high volume of requests that consume backend resources and can degrade availability. FastAPI does not enforce request-rate controls by default, so developers must add explicit mechanisms. DynamoDB, while scalable, can be a bottleneck under abusive patterns: on-demand tables may incur high read/write capacity usage from repeated queries, and provisioned tables can experience throttling when consumed beyond configured RCU/WCU. A common anti-pattern is performing strongly consistent reads or writes on every request without short-circuiting or caching, which increases DynamoDB consumed capacity and can trigger ProvisionedThroughputExceeded exceptions. Attackers may exploit this to amplify cost in pay-per-request billing, exhaust burst capacity, or trigger account-level limits. The combination is also observable in open API/Swagger specs where paths do not declare expected rate constraints, so unauthenticated scanning can easily discover endpoints and hammer them. middleBrick scans such unauthenticated attack surfaces and flags missing rate limiting as a finding, mapping it to the OWASP API Top 10 and highlighting risk to availability and cost control.
Dynamodb-Specific Remediation in Fastapi — concrete code fixes
Implement server-side rate limiting in FastAPI and couple it with DynamoDB patterns that reduce excessive consumption. Use a token-bucket or sliding-window algorithm stored in a fast, shared data store (e.g., Redis) to enforce per-user or per-IP limits before requests hit DynamoDB. For DynamoDB, prefer query patterns with efficient indexes, short TTL caching, and conditional writes to avoid unnecessary capacity usage. Below are two concrete code examples for a FastAPI endpoint that combines Redis-based rate limiting and DynamoDB with boto3, including error handling for throttling.
Example 1: Rate-limited DynamoDB query with on-demand capacity and short caching
from fastapi import FastAPI, HTTPException, Depends, Request
import boto3
import aioredis
import asyncio
import os
from botocore.exceptions import ClientError
app = FastAPI()
# Use environment variables in production
redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://localhost"))
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
table_name = os.getenv("TABLE_NAME", "Items")
table = dynamodb.Table(table_name)
RATE_LIMIT = 60 # requests per window_seconds
WINDOW_SECONDS = 60
async def get_redis():
# Simple connection helper; in production use a connection pool
return redis
async def rate_limited(request: Request, limit: int = RATE_LIMIT, window: int = WINDOW_SECONDS) -> bool:
"""Return True if allowed, False if rate-limited."""
client = await get_redis()
key = f"ratelimit:{request.client.host}"
try:
current = await client.get(key)
if current is None:
await client.set(key, 1, ex=window)
return True
if int(current) >= limit:
return False
await client.incr(key)
return True
except Exception:
# Fail open: allow request if Redis is unavailable but log in production
return True
@app.get("/items/{item_id}")
async def read_item(item_id: str, allowed: bool = Depends(rate_limited)):
if not allowed:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
try:
response = table.get_item(Key={"id": item_id}, ConsistentRead=False) # use eventual consistency where acceptable
item = response.get("Item")
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
except ClientError as e:
if e.response["Error"]["Code"] == "ProvisionedThroughputExceededException":
raise HTTPException(status_code=503, detail="Service temporarily overloaded")
raise HTTPException(status_code=500, detail="DynamoDB error")
Example 2: Conditional write with short TTL caching to reduce write amplification
import time
import json
CACHE_TTL = 30 # seconds to cache recent negative lookups
async def get_cached(key: str):
client = await get_redis()
cached = await client.get(key)
return json.loads(cached) if cached else None
async def set_cached(key: str, value, ttl: int = CACHE_TTL):
client = await get_redis()
await client.set(key, json.dumps(value), ex=ttl)
@app.post("/record")
async def record_event(payload: dict, allowed: bool = Depends(rate_limited)):
if not allowed:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Use a condition to avoid double-writes for duplicate events within a short window
cache_key = f"recent:{payload.get('user_id')}:{payload.get('event_type')}"
cached = await get_cached(cache_key)
if cached:
return {"skipped": True, "reason": "recent_duplicate"}
try:
# Conditional write: only proceed if attribute_not_exists for idempotency
response = table.put_item(
Item=payload,
ConditionExpression="attribute_not_exists(event_id)"
)
await set_cached(cache_key, payload)
return {"written": True}
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
return {"skipped": True, "reason": "already_exists"}
if e.response["Error"]["Code"] == "ProvisionedThroughputExceededException":
raise HTTPException(status_code=503, detail="Service temporarily overloaded")
raise HTTPException(status_code=500, detail="DynamoDB error")
These examples emphasize fast pre-checks to avoid unnecessary DynamoDB calls, reducing both risk of rate abuse impact and provisioned capacity stress. middleBrick can detect endpoints missing rate limiting and map findings to compliance frameworks, providing remediation guidance without fixing or blocking traffic.