Think of an API (Application Programming Interface) as a digital gateway or a counter at a bank. Itβs the public-facing access point that allows other applications to request data or perform actions. Just as a bank needs security to protect its vaults and serve the right customers, an API needs security to protect an organization's data and services.
APIs are the pipelines to a company's most valuable asset: its data. An insecure API is an open invitation for a data breach, exposing sensitive information like user credentials, personal details, financial records, and private messages. Strong security is the first and last line of defense against this theft.
Security isn't just about keeping everyone out; it's about letting the right people in and limiting what they can do. This is a two-part process:
A single, high-profile API breach can be catastrophic for a business. It instantly erodes customer trust, which is incredibly difficult to rebuild. This loss of trust leads to customer churn, negative press, and severe damage to a brand's reputation. Furthermore, failing to secure data can result in massive legal and financial penalties from regulatory bodies (like GDPR, HIPAA, or CCPA).
Beyond data theft, API security protects the availability and integrity of the service itself. Without security, malicious actors can:
Security measures like rate-limiting (limiting how many requests a user can make per minute) and input validation (blocking malformed requests) are essential to keep the API stable and reliable for legitimate users.
Ultimately, API security is not an optional feature. It is a fundamental, non-negotiable requirement for protecting data, maintaining user trust, and ensuring a service remains reliable.
# main.py
from fastapi import FastAPI, HTTPException, status, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from contextlib import asynccontextmanager
from enum import Enum
from . import crud, schemas, models
from .database import get_async_session, create_db_and_tables, AsyncSessionFactory
from .security import get_current_user, get_current_admin_user
from . import auth
# --- Enums for Sorting ---
class SortableFields(str, Enum):
emp_id = "emp_id"
emp_name = "emp_name"
emp_dob = "emp_dob"
city = "city"
country = "country"
class SortOrder(str, Enum):
asc = "asc"
desc = "desc"
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Creating database and tables...")
await create_db_and_tables()
print("Database and tables created.")
print("Checking for admin user...")
async with AsyncSessionFactory() as session:
admin_user = await crud.get_user_by_username(session, "admin")
if not admin_user:
print("Admin user not found, creating one...")
admin_user_data = schemas.UserCreate(
username="admin",
password="adminpassword"
)
await crud.create_admin_user(session, admin_user_data)
print("Admin user 'admin' created.")
else:
print("Admin user already exists.")
yield
app = FastAPI(
title="Employee Management API (Production Ready)",
description="A secure, high-performance API using SQLModel, PostgreSQL, OAuth2, and Auth scopes.",
lifespan=lifespan,
)
app.include_router(auth.router)
# --- API Endpoints ---
@app.get("/")
def read_root():
return {"message": "Welcome to the Employee Management API."}
@app.post(
"/employees/",
response_model=schemas.EmployeeRead,
status_code=status.HTTP_201_CREATED,
tags=["Employees"]
)
async def create_employee_endpoint(
employee_input: schemas.EmployeeCreate,
db: AsyncSession = Depends(get_async_session),
current_admin: models.User = Depends(get_current_admin_user)
):
"""Create a new employee record. (Admin Only)"""
return await crud.create_employee(db=db, employee=employee_input)
@app.get("/employees/", response_model=List[schemas.EmployeeRead], tags=["Employees"])
async def get_all_employees_endpoint(
db: AsyncSession = Depends(get_async_session),
current_user: models.User = Depends(get_current_user),
offset: int = 0,
limit: int = 100,
city: Optional[str] = None,
country: Optional[str] = None,
sort_by: SortableFields = SortableFields.emp_id,
order: SortOrder = SortOrder.asc
):
"""
Retrieve all employees. (Authenticated Users Only)
Supports pagination, filtering by city/country, and sorting.
"""
return await crud.get_all_employees(
db=db,
offset=offset,
limit=limit,
city=city,
country=country,
sort_by=sort_by.value,
order=order.value
)
@app.get("/employees/{emp_id}", response_model=schemas.EmployeeRead, tags=["Employees"])
async def get_employee_by_emp_id_endpoint(
emp_id: int,
db: AsyncSession = Depends(get_async_session),
current_user: models.User = Depends(get_current_user)
):
"""Retrieve an employee by their emp_id. (Authenticated Users Only)"""
employee = await crud.get_employee_by_emp_id(db, emp_id)
if not employee:
raise HTTPException(
status_code=404,
detail=f"Employee with emp_id '{emp_id}' not found"
)
return employee
@app.patch("/employees/{emp_id}", response_model=schemas.EmployeeRead, tags=["Employees"])
async def update_employee_by_emp_id_endpoint(
emp_id: int,
updated_details: schemas.EmployeeUpdate,
db: AsyncSession = Depends(get_async_session),
current_admin: models.User = Depends(get_current_admin_user)
):
"""
Updates the record for a specific employee. (Admin Only)
This uses PATCH logic (only updates provided fields).
"""
updated_employee = await crud.update_employee(
db=db,
emp_id=emp_id,
employee_update=updated_details
)
if not updated_employee:
raise HTTPException(
status_code=404,
detail=f"Employee with emp_id '{emp_id}' not found"
)
return updated_employee
@app.delete(
"/employees/{emp_id}",
status_code=status.HTTP_204_NO_CONTENT,
tags=["Employees"]
)
async def delete_employee_by_emp_id_endpoint(
emp_id: int,
db: AsyncSession = Depends(get_async_session),
current_admin: models.User = Depends(get_current_admin_user)
):
"""Delete an employee using their emp_id. (Admin Only)"""
success = await crud.delete_employee(db=db, emp_id=emp_id)
if not success:
raise HTTPException(
status_code=404,
detail=f"Employee with emp_id '{emp_id}' not found"
)
return None
# database.py
import os
from sqlmodel import SQLModel
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
raise ValueError("DATABASE_URL environment variable is not set")
# Create the async engine
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
# Session factory for creating new async sessions
AsyncSessionFactory = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False, # Good practice for async
)
async def create_db_and_tables():
"""Initializes the database tables."""
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async def get_async_session() -> AsyncSession:
"""FastAPI dependency to get an async database session."""
async with AsyncSessionFactory() as session:
yield session
# models.py
import uuid
from sqlmodel import SQLModel, Field
from datetime import date
from typing import Optional
class Employee(SQLModel, table=True):
id: Optional[uuid.UUID] = Field(
default_factory=uuid.uuid4,
primary_key=True,
index=True
)
emp_id: int = Field(unique=True, index=True)
emp_name: str
city: str
country: str
emp_dob: date
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(unique=True, index=True)
hashed_password: str
is_admin: bool = Field(default=False)
# crud.py
from sqlmodel import SQLModel, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from fastapi import HTTPException, status
from .models import Employee, User
from .schemas import EmployeeCreate, EmployeeUpdate, UserCreate
from typing import List, Optional
from .security import get_password_hash
# --- Employee CRUD ---
async def create_employee(db: AsyncSession, employee: EmployeeCreate) -> Employee:
existing = await get_employee_by_emp_id(db, employee.emp_id)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Employee business ID '{employee.emp_id}' already exists"
)
db_employee = Employee.model_validate(employee)
try:
db.add(db_employee)
await db.commit()
await db.refresh(db_employee)
return db_employee
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Conflict: A race condition occurred or emp_id was just created."
)
async def get_employee_by_emp_id(db: AsyncSession, emp_id: int) -> Optional[Employee]:
statement = select(Employee).where(Employee.emp_id == emp_id)
result = await db.execute(statement)
return result.scalars().first()
async def get_all_employees(
db: AsyncSession,
offset: int,
limit: int,
city: Optional[str] = None,
country: Optional[str] = None,
sort_by: str = "emp_id",
order: str = "asc"
) -> List[Employee]:
statement = select(Employee)
if city:
statement = statement.where(Employee.city.ilike(f"%{city}%"))
if country:
statement = statement.where(Employee.country.ilike(f"%{country}%"))
sort_column = getattr(Employee, sort_by, Employee.emp_id)
if order.lower() == "desc":
statement = statement.order_by(sort_column.desc())
else:
statement = statement.order_by(sort_column.asc())
statement = statement.offset(offset).limit(limit)
result = await db.execute(statement)
return result.scalars().all()
async def update_employee(
db: AsyncSession,
emp_id: int,
employee_update: EmployeeUpdate
) -> Optional[Employee]:
db_employee = await get_employee_by_emp_id(db, emp_id)
if not db_employee:
return None
update_data = employee_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_employee, key, value)
db.add(db_employee)
await db.commit()
await db.refresh(db_employee)
return db_employee
async def delete_employee(db: AsyncSession, emp_id: int) -> bool:
db_employee = await get_employee_by_emp_id(db, emp_id)
if not db_employee:
return False
await db.delete(db_employee)
await db.commit()
return True
# --- User CRUD ---
async def get_user_by_username(db: AsyncSession, username: str) -> Optional[User]:
statement = select(User).where(User.username == username)
result = await db.execute(statement)
return result.scalars().first()
async def create_user(db: AsyncSession, user: UserCreate) -> User:
existing = await get_user_by_username(db, user.username)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Username already exists"
)
hashed_password = get_password_hash(user.password)
db_user = User(
username=user.username,
hashed_password=hashed_password,
is_admin=False # Regular users are not admin
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
async def create_admin_user(db: AsyncSession, user: UserCreate) -> User:
"""A special function to create an admin user."""
if await get_user_by_username(db, user.username):
raise HTTPException(status_code=409, detail="Username exists")
hashed_password = get_password_hash(user.password)
db_user = User(
username=user.username,
hashed_password=hashed_password,
is_admin=True
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
# security.py
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from dotenv import load_dotenv
from . import schemas, crud
from .database import get_async_session
from .models import User
load_dotenv()
# --- CONFIGURATION ---
SECRET_KEY = os.getenv("JWT_SECRET_KEY")
ALGORITHM = os.getenv("JWT_ALGORITHM")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30))
if not SECRET_KEY:
raise ValueError("JWT_SECRET_KEY environment variable is not set")
# Password Hashing Context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 Scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
# --- PASSWORD UTILITIES ---
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
# --- JWT TOKEN UTILITIES ---
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# --- AUTHENTICATION & DEPENDENCIES ---
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_async_session)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = schemas.TokenData(username=username)
except JWTError:
raise credentials_exception
user = await crud.get_user_by_username(db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_admin_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
A dependency that checks if the current user is an admin.
"""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user does not have administrative privileges"
)
return current_user
The project is organized into a main application package (app) and a root-level directory for configuration and testing. This separation of concerns (SoC) is standard for professional FastAPI applications.
project_root/
βββ .venv/ # Virtual environment folder
βββ app/ # Main application package
β βββ __init__.py # Makes 'app' a Python package
β βββ main.py # FastAPI app object, endpoints, lifespan
β βββ auth.py # /auth/token and /auth/register routes
β βββ crud.py # Database logic (Create, Read, Update, Delete)
β βββ database.py # Database connection (engine, session)
β βββ models.py # Database table definitions (SQLModel)
β βββ schemas.py # API data validation schemas (Pydantic/SQLModel)
β βββ security.py # Auth logic (JWT, password hashing, dependencies)
β
βββ tests/ # Application test suite
β βββ __init__.py # Makes 'tests' a package
β βββ test_api.py # All API integration tests
β
βββ .env # Stores secrets (DB URL, JWT key)
βββ conftest.py # Pytest setup and fixtures (e.g., test client)
βββ requirements.txt # All project dependencies
The API is built on a modern, asynchronous Python stack:
Security is a core feature, split into two parts: proving who you are (Authentication) and what you're allowed to do (Authorization).
The API uses the standard OAuth2 Password Flow for user login.
username and password to the /auth/token endpoint.User table (using passlib for secure password hashing).Authorization: Bearer <token> header for all future requests to secure endpoints.
Not all users are equal. The User model has an is_admin flag to enforce different permissions. This is handled by FastAPI dependencies.
GET /employees/).POST, PATCH, DELETE).This is enforced in the code by using a specific dependency. Any endpoint that requires an admin will fail with a 403 Forbidden error if a regular user tries to access it.
# This dependency checks for a valid token AND the is_admin flag
from .security import get_current_admin_user
@app.post("/employees/", tags=["Employees"])
async def create_employee_endpoint(
employee_input: schemas.EmployeeCreate,
current_admin: models.User = Depends(get_current_admin_user)
):
# This code will only run if the user is a logged-in admin
return await crud.create_employee(db=db, employee=employee_input)
The API is designed to handle a large number of requests and a large amount of data efficiently.
The entire application is async, from the API endpoints down to the database queries. This means the server can handle many concurrent connections without getting stuck waiting for one slow operation to finish.
The GET /employees/ endpoint is paginated to prevent it from ever trying to send millions of records at once. It accepts limit and offset query parameters.
GET /employees/?limit=50&offset=100
This request fetches 50 employees, starting after the first 100. This keeps database queries and JSON response payloads small and fast.
To ensure lookups are fast even with millions of rows, database indexes are placed on key fields in models.py. This makes find operations (like finding a user by username or an employee by emp_id) nearly instantaneous (O(log N) complexity).
class Employee(SQLModel, table=True):
id: Optional[uuid.UUID] = Field(..., primary_key=True)
emp_id: int = Field(unique=True, index=True) # <-- Indexed
class User(SQLModel, table=True):
id: Optional[int] = Field(..., primary_key=True)
username: str = Field(unique=True, index=True) # <-- Indexed
The API uses Pydantic (via SQLModel) to enforce strict data validation rules. This prevents bad data from ever reaching the database.
emp_id *must* be an integer, emp_dob *must* be a valid date.emp_id must be greater than 0 (ge=1).emp_dob (date of birth) cannot be a date in the future.# This code is in app/schemas.py
class EmployeeBase(SQLModel):
emp_id: int = Field(..., ge=1) # "ge=1" means "greater than or equal to 1"
emp_dob: date = Field(...)
# This custom validator runs on all incoming data
@field_validator('emp_dob', mode='before')
def validate_dob_not_in_future(cls, v):
if v > date.today():
raise ValueError("Date of birth cannot be in the future")
return v
If a user tries to POST data that violates these rules, the API automatically responds with a descriptive 422 Unprocessable Entity error.
The project includes a complete test suite using pytest to guarantee reliability and prevent regressions.
The most important feature of the test setup is total database isolation.
pytest runs, it overrides the database dependency and connects to a clean, in-memory SQLite database.conftest.py defines reusable "fixtures" to make testing simple. The most useful are:
client: A new, unauthenticated API client.normal_user_client: A client that is already logged in as a newly created regular user.admin_user_client: A client that is already logged in as a newly created admin user.These independent clients fixed a complex test isolation bug, ensuring that tests don't share authentication tokens and can run in any order.
The test suite in app/tests/test_api.py covers all critical paths:
200 OK, 201 Created)401 Unauthorized)403 Forbidden)422 Unprocessable Entity)emp_id fail? (409 Conflict)/auth/token - Login to get a token./auth/register - Create a new regular user./users/ - Get a list of all users./employees/ - Get a paginated, filterable list of employees./employees/{emp_id} - Get a single employee./employees/ - Create a new employee./employees/{emp_id} - Update an existing employee./employees/{emp_id} - Delete an employee. Python API V.03.pip install fastapi "uvicorn[standard]"
uvicorn app.main:app --reload
main: The name of your file (main.py).app: The name of the FastAPI() object in your code.--reload: This makes the server restart automatically every time you save changes to the file. (This time the database will persist because its not an in-memory database.)http://127.0.0.1:8000/docs. You will see an interactive documentation page where you can test all your API endpoints.