Token Blocklist#
When a user logs out, you need to invalidate their token. Since JWTs are stateless, you need a blocklist to track revoked tokens.
Complete Example#
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException
from authx import AuthX, AuthXConfig, RequestToken, TokenPayload
app = FastAPI()
config = AuthXConfig(
JWT_SECRET_KEY="your-secret-key",
JWT_TOKEN_LOCATION=["headers"],
)
auth = AuthX(config=config)
auth.handle_errors(app)
# Simple in-memory blocklist (use Redis/database in production)
BLOCKLIST: set[str] = set()
@auth.set_callback_token_blocklist
def is_token_revoked(token: str) -> bool:
"""Return True if token is revoked."""
return token in BLOCKLIST
@app.post("/login")
def login(username: str, password: str):
if username == "test" and password == "test":
token = auth.create_access_token(uid=username)
return {"access_token": token}
raise HTTPException(401, detail="Invalid credentials")
@app.post("/logout", dependencies=[Depends(auth.access_token_required)])
async def logout(request):
"""Add current token to blocklist."""
token = await auth.get_access_token_from_request(request)
BLOCKLIST.add(token.token)
return {"message": "Logged out"}
@app.get("/protected", dependencies=[Depends(auth.access_token_required)])
def protected():
return {"message": "Access granted"}
How It Works#
1. Define the Blocklist Callback#
Create a function that checks if a token is revoked:
@auth.set_callback_token_blocklist
def is_token_revoked(token: str) -> bool:
"""Return True if token is revoked, False otherwise."""
return token in BLOCKLIST
This callback runs automatically whenever a protected route validates a token.
2. Add Tokens to Blocklist on Logout#
@app.post("/logout", dependencies=[Depends(auth.access_token_required)])
async def logout(request):
token = await auth.get_access_token_from_request(request)
BLOCKLIST.add(token.token)
return {"message": "Logged out"}
Testing#
# 1. Login
curl -X POST "http://localhost:8000/login?username=test&password=test"
# {"access_token": "eyJ..."}
# 2. Access protected route - WORKS
curl -H "Authorization: Bearer <token>" http://localhost:8000/protected
# {"message": "Access granted"}
# 3. Logout
curl -X POST -H "Authorization: Bearer <token>" http://localhost:8000/logout
# {"message": "Logged out"}
# 4. Try to access protected route again - FAILS
curl -H "Authorization: Bearer <token>" http://localhost:8000/protected
# {"message": "Invalid token", "error_type": "RevokedTokenError"}
Production: Using a Database#
For production, store revoked tokens in a database. Here's an example with SQLAlchemy:
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine, Column, String, DateTime
from sqlalchemy.orm import sessionmaker, declarative_base
from datetime import datetime, timezone
from authx import AuthX, AuthXConfig, TokenPayload
app = FastAPI()
config = AuthXConfig(
JWT_SECRET_KEY="your-secret-key",
JWT_TOKEN_LOCATION=["headers"],
)
auth = AuthX(config=config)
auth.handle_errors(app)
# Database setup
Base = declarative_base()
engine = create_engine("sqlite:///./tokens.db")
SessionLocal = sessionmaker(bind=engine)
class RevokedToken(Base):
__tablename__ = "revoked_tokens"
token = Column(String, primary_key=True)
revoked_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
Base.metadata.create_all(bind=engine)
@auth.set_callback_token_blocklist
def is_token_revoked(token: str) -> bool:
with SessionLocal() as db:
return db.query(RevokedToken).filter_by(token=token).first() is not None
@app.post("/login")
def login(username: str, password: str):
if username == "test" and password == "test":
return {"access_token": auth.create_access_token(uid=username)}
raise HTTPException(401, detail="Invalid credentials")
@app.post("/logout", dependencies=[Depends(auth.access_token_required)])
async def logout(request):
token = await auth.get_access_token_from_request(request)
with SessionLocal() as db:
db.add(RevokedToken(token=token.token))
db.commit()
return {"message": "Logged out"}
@app.get("/protected", dependencies=[Depends(auth.access_token_required)])
def protected():
return {"message": "Access granted"}
Production: Using Redis#
For high-performance applications, use Redis:
import redis
from authx import AuthX, AuthXConfig
auth = AuthX(config=AuthXConfig(JWT_SECRET_KEY="your-secret-key"))
redis_client = redis.Redis(host="localhost", port=6379, db=0)
@auth.set_callback_token_blocklist
def is_token_revoked(token: str) -> bool:
return redis_client.exists(f"revoked:{token}") > 0
def revoke_token(token: str, expires_in: int = 86400):
"""Add token to blocklist with expiration."""
redis_client.setex(f"revoked:{token}", expires_in, "1")
Token Expiration
Only store tokens in the blocklist until they would naturally expire. This keeps the blocklist small.
Important Notes#
Revoke Both Tokens
When logging out, revoke both access and refresh tokens: