Author | Nejat Hakan |
nejat.hakan@outlook.de | |
PayPal Me | https://paypal.me/nejathakan |
API Development with FastAPI
Introduction
Welcome to this comprehensive guide on developing APIs (Application Programming Interfaces) using FastAPI on the Linux operating system. In today's software landscape, APIs act as the crucial backbone for communication between different software components, whether it's between a web frontend and a backend server, between microservices, or mobile applications and their servers. FastAPI has rapidly emerged as a leading Python framework for building high-performance, easy-to-learn, fast-to-code, and robust APIs.
FastAPI leverages modern Python features, specifically type hints (based on Python 3.6+), making development intuitive and reducing bugs. It's built upon Starlette (for the web parts) and Pydantic (for the data parts), providing asynchronous capabilities out-of-the-box thanks to ASGI (Asynchronous Server Gateway Interface). This makes it exceptionally well-suited for I/O-bound tasks commonly found in web APIs, allowing for high concurrency and performance comparable to Node.js and Go frameworks.
Key benefits of using FastAPI include:
- High Performance: Achieves performance on par with NodeJS and Go, thanks to Starlette and Pydantic.
- Fast Development: Features like type hints and automatic data validation significantly speed up the development process (estimated 200% to 300% faster).
- Fewer Bugs: Python type hints reduce runtime errors, and automatic data validation catches invalid data early.
- Intuitive: Great editor support (autocompletion everywhere) due to type hints.
- Easy: Designed to be easy to use and learn. Less time reading docs.
- Short: Minimize code duplication. Multiple features from each parameter declaration.
- Robust: Get production-ready code. With automatic interactive documentation.
- Standards-based: Based on (and fully compatible with) the open standards for APIs: OpenAPI (previously known as Swagger) and JSON Schema.
This guide is structured progressively, starting from basic concepts and setup, moving through intermediate topics like data validation and application structuring, and finally delving into advanced areas such as asynchronous operations, authentication, database integration, testing, and deployment, specifically tailored for a Linux environment. Each section includes a practical workshop where you'll apply the learned concepts by building and enhancing a real-world Task Management API step-by-step. We assume you have a basic understanding of Python programming and familiarity with the Linux command line. Let's begin building powerful APIs with FastAPI!
1. Basic Concepts and Setup
Before diving into FastAPI code, let's establish a solid foundation by understanding the core concepts involved in API development and how to set up your development environment on Linux.
Core API Concepts
- API (Application Programming Interface): An API is a set of definitions, protocols, and tools for building software applications. In the context of web development, it most commonly refers to a web API that defines how different software systems can interact with each other over a network, typically using HTTP(S). Think of it as a contract defining the requests a client can make and the responses they can expect from a server.
- REST (Representational State Transfer): REST is an architectural style, not a protocol, for designing networked applications. It relies on a stateless client-server communication protocol, almost always HTTP. RESTful APIs use standard HTTP methods to perform operations on resources. Key principles include:
- Client-Server Architecture: Separation of concerns between the client (requesting data) and the server (managing data and logic).
- Statelessness: Each request from a client to the server must contain all the information needed to understand and process the request. The server does not store any client context between requests.
- Cacheability: Responses must define themselves as cacheable or not, to improve performance and scalability.
- Layered System: A client cannot ordinarily tell whether it is connected directly to the end server or to an intermediary along the way.
- Uniform Interface: This simplifies and decouples the architecture. Key constraints include:
- Resource Identification: Resources (e.g., a user, a task, a product) are identified using URIs (Uniform Resource Identifiers), typically URLs. Example:
/tasks/123
. - Resource Manipulation Through Representations: Clients interact with resources via their representations (commonly JSON or XML).
- Self-descriptive Messages: Each message includes enough information to describe how to process it (e.g., using standard HTTP methods and media types like
application/json
). - Hypermedia as the Engine of Application State (HATEOAS): Responses can include links to related actions or resources, allowing clients to navigate the API dynamically (less commonly implemented strictly).
- Resource Identification: Resources (e.g., a user, a task, a product) are identified using URIs (Uniform Resource Identifiers), typically URLs. Example:
- HTTP Methods: These verbs define the action to be performed on a resource identified by a URL. Common methods used in REST APIs include:
- GET: Retrieve a representation of a resource. (Safe and idempotent)
- POST: Create a new resource. (Not safe, not idempotent)
- PUT: Replace an existing resource entirely or create it if it doesn't exist. (Not safe, idempotent)
- PATCH: Apply partial modifications to an existing resource. (Not safe, not necessarily idempotent)
- DELETE: Remove a resource. (Not safe, idempotent)
- (Safe methods do not change the server state. Idempotent methods mean multiple identical requests have the same effect as a single request.)
- JSON (JavaScript Object Notation): A lightweight data-interchange format that is easy for humans to read and write and easy for machines to parse and generate. It's the de facto standard format for data exchange in modern web APIs due to its simplicity and native support in JavaScript and many other languages, including Python.
FastAPI Basics
FastAPI makes implementing these concepts straightforward using Python.
- Path Operations: In FastAPI, you define API endpoints using path operation decorators. These decorators link a URL path and an HTTP method to a Python function.
- Example:
@app.get("/items/{item_id}")
defines an endpoint that responds to HTTP GET requests at paths like/items/1
or/items/abc
.
- Example:
- Path Parameters: Parts of the URL path can be declared as variables using curly braces
{}
. These are passed as arguments to your function. Type hints are used to define the expected data type.- Example: In
@app.get("/items/{item_id}")
,item_id
is a path parameter. The function signature might beasync def read_item(item_id: int):
. FastAPI automatically validates that the path segment can be converted to an integer.
- Example: In
- Query Parameters: Parameters not part of the path structure are automatically interpreted as query parameters (e.g.,
/items?skip=0&limit=10
). They are declared as function arguments that are not part of the path. Default values can be provided.- Example:
async def read_items(skip: int = 0, limit: int = 10):
.
- Example:
- Request Body: When a client needs to send data to the API (typically with POST, PUT, PATCH), it's sent in the request body, often as JSON. FastAPI uses Pydantic models to declare the structure and data types of the expected request body.
- Example: Define a Pydantic model
class Item(BaseModel): name: str; price: float
. Then declare it as a function parameter:async def create_item(item: Item):
. FastAPI automatically reads the request body, parses the JSON, validates it against theItem
model, and provides the validated data as theitem
argument.
- Example: Define a Pydantic model
- Pydantic Models: Pydantic is a data validation and settings management library. FastAPI uses it extensively for defining the structure, types, and validation rules for data sent to and from the API. You define data shapes using standard Python type hints in classes inheriting from
pydantic.BaseModel
. - Automatic Interactive Documentation: One of FastAPI's most powerful features is the automatic generation of interactive API documentation based on your code (using OpenAPI and JSON Schema standards). Simply run your server and navigate to
/docs
(Swagger UI) or/redoc
(ReDoc) in your browser. This allows you and frontend developers to explore and interact with the API directly.
Installation on Linux
We'll use pip
, Python's package installer, and virtual environments to manage project dependencies cleanly.
-
Ensure Python 3.6+ and Pip: Most modern Linux distributions come with Python 3. Check your version:
If needed, install or update Python and pip using your distribution's package manager (e.g.,sudo apt update && sudo apt install python3 python3-pip python3-venv
on Debian/Ubuntu, orsudo dnf install python3 python3-pip
on Fedora). -
Create a Project Directory:
-
Create and Activate a Virtual Environment: Using a virtual environment isolates project dependencies from your global Python installation.
(To deactivate later, simply runpython3 -m venv venv # Creates a 'venv' directory source venv/bin/activate # Activates the environment (your shell prompt should change)
deactivate
) -
Install FastAPI and Uvicorn:
- FastAPI: The core framework.
- Uvicorn: An ASGI server required to run your FastAPI application.
The
[standard]
part installs optional, highly recommended dependencies for Uvicorn likeuvloop
andhttptools
for better performance on Linux.
Running a Simple FastAPI App
-
Create
main.py
: Inside yourtask_manager_api
directory, create a file namedmain.py
with the following content:from fastapi import FastAPI # Create a FastAPI instance app = FastAPI() # Define a path operation decorator for the root path "/" @app.get("/") async def read_root(): # Return a dictionary, which FastAPI converts to JSON return {"message": "Hello World"} # Define a path operation for /items/{item_id} using a path parameter @app.get("/items/{item_id}") async def read_item(item_id: int, q: str | None = None): # item_id is a path parameter (validated as int) # q is an optional query parameter (validated as str) response = {"item_id": item_id} if q: response.update({"q": q}) return response
-
Run the Server: From your terminal (ensure your virtual environment
venv
is activated), run Uvicorn:main
: Refers to the filemain.py
.app
: Refers to theFastAPI()
instance created insidemain.py
.--reload
: Makes the server restart automatically after code changes. Use only for development.
-
Access the API: Uvicorn will typically start the server on
http://127.0.0.1:8000
.- Open your web browser and go to
http://127.0.0.1:8000
. You should see{"message":"Hello World"}
. - Go to
http://127.0.0.1:8000/items/5
. You should see{"item_id":5}
. - Go to
http://127.0.0.1:8000/items/5?q=somequery
. You should see{"item_id":5,"q":"somequery"}
. - Go to
http://127.0.0.1:8000/items/abc
. You will see an error response like{"detail":[{"loc":["path","item_id"],"msg":"value is not a valid integer","type":"type_error.integer"}]}
, because FastAPI automatically validateditem_id
against theint
type hint.
- Open your web browser and go to
-
Explore Interactive Docs:
- Navigate to
http://127.0.0.1:8000/docs
. You'll see the Swagger UI, allowing you to explore and test your endpoints directly from the browser. - Navigate to
http://127.0.0.1:8000/redoc
. You'll see alternative documentation provided by ReDoc.
- Navigate to
This basic setup forms the foundation upon which we will build our Task Management API in the workshops.
Workshop Getting Started with Your First FastAPI App
In this workshop, we'll create the initial version of our Task Management API. It will allow users to get a list of tasks and add new tasks. We'll store tasks in a simple Python list in memory for now.
Project: Basic Task Management API (In-memory)
Goal: Implement GET (/tasks
) and POST (/tasks
) endpoints. Use basic Pydantic models for task creation.
Steps:
-
Ensure Setup: Make sure you are in your
task_manager_api
directory with the virtual environment activated (source venv/bin/activate
) andfastapi
anduvicorn[standard]
installed. -
Modify
main.py
: Replace the content ofmain.py
with the following:from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List, Optional # Use Optional for compatibility, or | in Python 3.10+ # --- Pydantic Models --- # Define the structure for a Task when creating it (input) class TaskCreate(BaseModel): title: str description: Optional[str] = None # Optional field with a default value # Define the structure for a Task when returning it (output) # Includes the ID assigned by the system class Task(TaskCreate): id: int completed: bool = False # Add a completed status, default to False # --- In-Memory Storage --- # A simple list to act as our "database" # We use a dictionary to easily find tasks by ID later tasks_db = {} next_task_id = 1 # --- FastAPI Application --- app = FastAPI( title="Simple Task Management API", description="A basic API to manage tasks (in-memory version).", version="0.1.0", ) # --- API Endpoints (Path Operations) --- @app.get("/") async def read_root(): """ Root endpoint providing a welcome message. """ return {"message": "Welcome to the Simple Task Management API"} @app.post("/tasks", response_model=Task, status_code=201) async def create_task(task_in: TaskCreate): """ Create a new task. Takes a task title and optional description in the request body. Assigns a unique ID and returns the created task. """ global next_task_id new_task_id = next_task_id # Create a full Task object from the input model and add server-generated fields task_data = task_in.dict() # Convert Pydantic model to dict new_task = Task(**task_data, id=new_task_id, completed=False) # Store the task in our "database" tasks_db[new_task_id] = new_task next_task_id += 1 # Return the newly created task object # FastAPI automatically uses the response_model (Task) for serialization return new_task @app.get("/tasks", response_model=List[Task]) async def read_tasks(skip: int = 0, limit: int = 10): """ Retrieve a list of tasks. Supports pagination using 'skip' and 'limit' query parameters. """ # Convert the dictionary values (our tasks) to a list all_tasks = list(tasks_db.values()) # Apply pagination return all_tasks[skip : skip + limit] @app.get("/tasks/{task_id}", response_model=Task) async def read_task(task_id: int): """ Retrieve a single task by its ID. Returns a 404 error if the task is not found. """ task = tasks_db.get(task_id) if task is None: # Raise an HTTPException, FastAPI converts this to a proper HTTP error response raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found") return task
-
Run the Server: If the server is already running with
--reload
, it should restart automatically. Otherwise, start it: -
Interact using Swagger UI:
- Open
http://127.0.0.1:8000/docs
in your browser. - Notice the new
/tasks
endpoints listed. - Test POST /tasks:
- Expand the
POST /tasks
section. - Click "Try it out".
- Modify the example Request body JSON. For example:
- Click "Execute".
- Observe the Response body (should show the created task with an
id
andcompleted: false
) and the201 Created
status code. - Create a few more tasks (e.g., "Buy Groceries", "Read Chapter 2").
- Expand the
- Test GET /tasks:
- Expand the
GET /tasks
section. - Click "Try it out".
- Click "Execute".
- Observe the Response body containing a JSON array of the tasks you created.
- Try changing the
skip
andlimit
parameters and execute again to test pagination.
- Expand the
- Test GET /tasks/{task_id}:
- Expand the
GET /tasks/{task_id}
section. - Click "Try it out".
- Enter the
id
of a task you created (e.g.,1
). - Click "Execute". Observe the specific task details.
- Try entering an
id
that doesn't exist (e.g.,999
). Observe the404 Not Found
error response with the detail message.
- Expand the
- Open
-
Interact using
curl
(Linux Terminal):curl
is a powerful command-line tool for making HTTP requests. Open a new terminal window (leave Uvicorn running).- Get all tasks:
- Get a specific task (e.g., ID 1):
- Create a new task:
(Explanation:
curl -X POST "http://127.0.0.1:8000/tasks" \ -H "Content-Type: application/json" \ -d '{"title": "Learn curl", "description": "Practice API calls from terminal"}' \ | jq
-X POST
specifies the method.-H
sets the header.-d
provides the JSON data payload.) - Attempt to get a non-existent task:
You should see the JSON error response generated by the
HTTPException
.
You have now successfully built and interacted with your first FastAPI application, complete with basic CRUD (Create, Read) operations, path/query parameters, request body handling using Pydantic, and basic error handling!
2. Data Validation and Error Handling
One of FastAPI's core strengths lies in its seamless integration with Pydantic for robust data validation and its built-in mechanisms for handling errors gracefully. This section delves deeper into leveraging Pydantic for more complex validation scenarios and customizing error responses.
Advanced Pydantic Validation
Pydantic models are not just for defining data types; they provide a powerful way to enforce constraints on incoming data directly within the model definition. This keeps your API endpoint logic cleaner and focused on business rules rather than repetitive validation checks.
-
Built-in Validation Constraints: Pydantic's
Field
function allows you to specify various constraints alongside type hints.- Numeric Constraints:
gt
(greater than),lt
(less than),ge
(greater than or equal),le
(less than or equal). - String Constraints:
min_length
,max_length
,regex
.from pydantic import BaseModel, Field import re # Import regular expression module class UserCreate(BaseModel): username: str = Field(min_length=3, max_length=50) # Simple regex for basic email format check email: str = Field(regex=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$") password: str = Field(min_length=8)
- Other Fields: Default values, aliases (e.g., if your API expects
item-name
but you want to useitem_name
in Python), titles, descriptions (which appear in the OpenAPI docs).from pydantic import BaseModel, Field from typing import Optional class Product(BaseModel): product_id: str = Field(..., alias="productId", title="Product ID", description="Unique identifier for the product") # ... means required name: str = Field(min_length=1) tags: Optional[list[str]] = Field(default=None, max_items=10) # Optional list, max 10 tags
- Numeric Constraints:
-
Optional Fields and Default Values: As seen before, use
Optional[Type]
orType | None
(Python 3.10+) for optional fields. Provide default values directly or withinField
. -
Nested Models: You can embed Pydantic models within other models to represent complex JSON structures. FastAPI handles validation recursively.
An incoming JSON for thisfrom pydantic import BaseModel from typing import List, Optional class Image(BaseModel): url: str name: str class Item(BaseModel): name: str description: Optional[str] = None price: float = Field(gt=0) tags: List[str] = [] # List of strings, defaults to empty list image: Optional[Image] = None # Nested model, optional
Item
might look like: -
Custom Validators (
@validator
): For more complex validation logic that Pydantic's built-in constraints don't cover, you can define custom validator methods within your Pydantic model using the@validator
decorator.from pydantic import BaseModel, validator, Field class Event(BaseModel): start_date: date end_date: date @validator('end_date') def end_date_must_be_after_start_date(cls, end_date_value, values): # 'values' is a dict containing fields already validated start_date_value = values.get('start_date') if start_date_value and end_date_value < start_date_value: raise ValueError('End date must be after start date') return end_date_value
FastAPI's Automatic Validation Handling
When you declare a Pydantic model as the type hint for a request body parameter (or query/path parameters with type hints), FastAPI automatically performs the following:
- Reads the request body (or relevant request parts).
- Parses the JSON data (if applicable).
- Validates the data against the Pydantic model, including all defined types and constraints.
- If validation succeeds: Passes the validated data (as a Pydantic model instance) to your path operation function.
- If validation fails: FastAPI automatically intercepts the
ValidationError
raised by Pydantic and returns a standard HTTP 422 Unprocessable Entity error response. The response body contains detailed JSON information about the validation errors, including the location (loc
) and type (type
) of each error. This is extremely helpful for clients to understand what went wrong.
You saw this earlier when providing a non-integer value for item_id
. The automatic 422 response is usually sufficient for validation errors.
Handling Application Errors (HTTPException
)
Beyond data validation, your application logic might encounter other errors (e.g., resource not found, permission denied, business rule violation). FastAPI provides the HTTPException
class to handle these cases gracefully.
When you raise HTTPException(status_code=..., detail=...)
, FastAPI catches it and sends an appropriate HTTP error response to the client with the specified status code and detail message (which can be a string or any JSON-serializable structure).
- Common Status Codes:
400 Bad Request
: Generic client-side error (e.g., malformed request syntax, but often 422 is better for validation).401 Unauthorized
: Authentication is required and has failed or has not yet been provided.403 Forbidden
: The server understood the request, but refuses to authorize it (client does not have permission).404 Not Found
: The requested resource could not be found.409 Conflict
: The request could not be completed due to a conflict with the current state of the resource (e.g., trying to create a resource that already exists with a unique constraint).422 Unprocessable Entity
: The server understands the content type and syntax of the request entity, but was unable to process the contained instructions (typically validation errors).500 Internal Server Error
: A generic error message, given when an unexpected condition was encountered and no more specific message is suitable. Avoid letting unexpected Python exceptions bubble up to the user; catch them and potentially raise a 500 HTTPException.
Example (from the previous workshop):
from fastapi import FastAPI, HTTPException
# ... (tasks_db defined elsewhere) ...
@app.get("/tasks/{task_id}", response_model=Task)
async def read_task(task_id: int):
task = tasks_db.get(task_id)
if task is None:
# Explicitly raise 404 Not Found
raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found")
return task
Custom Error Handling
While HTTPException
and the automatic 422 handling cover most common cases, you might need more control:
- Logging Errors: Log exceptions before returning the HTTP response.
- Standardizing Error Formats: Ensure all error responses (validation, custom, unexpected) follow a consistent JSON structure.
- Handling Specific Exception Types: Catch specific exceptions raised by your business logic or third-party libraries and map them to appropriate HTTP errors.
FastAPI allows this using Exception Handlers. You can add custom handlers using the @app.exception_handler(SpecificExceptionType)
decorator.
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import ValidationError # Import Pydantic's validation error
# Example custom exception for business logic
class BusinessLogicError(Exception):
def __init__(self, detail: str):
self.detail = detail
app = FastAPI()
# Custom handler for our specific BusinessLogicError
@app.exception_handler(BusinessLogicError)
async def business_logic_exception_handler(request: Request, exc: BusinessLogicError):
# Log the error, perform cleanup, etc.
print(f"Business logic error occurred: {exc.detail}")
return JSONResponse(
status_code=400, # Or another appropriate code
content={"error_type": "BusinessRuleViolation", "message": exc.detail},
)
# Override the default handler for Pydantic validation errors
@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
# You could reformat the error details here if needed
print(f"Pydantic validation error: {exc.errors()}")
return JSONResponse(
status_code=422,
content={"detail": exc.errors(), "message": "Input validation failed"},
)
# Example endpoint that might raise the custom error
@app.post("/process-data")
async def process_data(data: dict): # Assuming some complex data structure
if "forbidden_key" in data:
raise BusinessLogicError(detail="The key 'forbidden_key' is not allowed.")
# ... process data ...
return {"message": "Data processed successfully"}
# Add other endpoints...
This gives you fine-grained control over how different types of errors are translated into HTTP responses sent back to the client.
Workshop Enhancing the Task Management API with Validation and Errors
Project: Improve the Task Management API by adding PUT and DELETE endpoints, enhancing the Task model with validation, and ensuring proper error handling.
Goal: Implement update and delete functionality. Add constraints to the Task model. Use HTTPException
for "not found" scenarios in update/delete.
Steps:
-
Ensure Setup: Be in the
task_manager_api
directory with the virtual environment activated. Have themain.py
from the previous workshop. -
Enhance Pydantic Models: Modify the
TaskCreate
andTask
models inmain.py
to include more fields and validation constraints usingField
. Let's also add adue_date
.Self-correction: The update logic for in-memory Pydantic models needs care. Directly updating# Add imports if missing from pydantic import BaseModel, Field from typing import List, Optional from datetime import date # Import date type # --- Pydantic Models --- class TaskBase(BaseModel): # Base model with common fields title: str = Field(..., min_length=3, max_length=50, description="The title of the task (must be between 3 and 50 chars)") description: Optional[str] = Field(None, max_length=500, description="Optional description (max 500 chars)") due_date: Optional[date] = Field(None, description="Optional due date in YYYY-MM-DD format") class TaskCreate(TaskBase): # Model for creating tasks (inherits from TaskBase) pass # No additional fields needed for creation yet class TaskUpdate(TaskBase): # Model for updating tasks (all fields are optional for partial updates) # Override fields to make them optional for PATCH-like behavior with PUT # A more RESTful PUT might require all fields, but this is common practice title: Optional[str] = Field(None, min_length=3, max_length=50) description: Optional[str] = Field(None, max_length=500) due_date: Optional[date] = None completed: Optional[bool] = None # Allow updating completion status class Task(TaskBase): # Model for returning tasks (includes generated fields) id: int completed: bool = False # Pydantic configuration class (optional but good practice) class Config: orm_mode = True # Important later for SQLAlchemy integration # Example generation for OpenAPI docs schema_extra = { "example": { "id": 1, "title": "Example Task", "description": "This is an example task description.", "due_date": "2024-12-31", "completed": False, } } # --- In-Memory Storage (Keep as is for now) --- tasks_db = {} next_task_id = 1 # --- FastAPI Application (Keep as is) --- app = FastAPI( title="Enhanced Task Management API", description="API with validation, error handling, update & delete.", version="0.2.0", ) # --- API Endpoints --- # Keep existing GET /, POST /tasks, GET /tasks, GET /tasks/{task_id} # Modify create_task to use the updated Task model structure @app.post("/tasks", response_model=Task, status_code=201) async def create_task(task_in: TaskCreate): """ Create a new task with validation. """ global next_task_id new_task_id = next_task_id # Create a full Task object # Note: We are setting completed to False by default here. new_task = Task(**task_in.dict(), id=new_task_id, completed=False) tasks_db[new_task_id] = new_task next_task_id += 1 return new_task # Keep GET /tasks and GET /tasks/{task_id} as they were (they use the new Task model now) @app.get("/tasks", response_model=List[Task]) async def read_tasks(skip: int = 0, limit: int = 10): all_tasks = list(tasks_db.values()) return all_tasks[skip : skip + limit] @app.get("/tasks/{task_id}", response_model=Task) async def read_task(task_id: int): task = tasks_db.get(task_id) if task is None: raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found") return task # Add PUT endpoint for updating tasks @app.put("/tasks/{task_id}", response_model=Task) async def update_task(task_id: int, task_in: TaskUpdate): """ Update an existing task by ID. Uses TaskUpdate model allowing partial updates. Returns 404 if the task is not found. """ task_to_update = tasks_db.get(task_id) if task_to_update is None: raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found for update") # Get the data from the input model, excluding unset fields for partial update update_data = task_in.dict(exclude_unset=True) # Update the existing task object (Pydantic models are immutable by default, # so we work with the stored object which we assume is mutable, e.g. if it was a dict # or a SQLAlchemy model later. For Pydantic models stored directly, it's better # to create a new model instance.) # Let's update the stored Pydantic model by creating a new one: updated_task_data = task_to_update.dict() # Get current data updated_task_data.update(update_data) # Apply changes updated_task = Task(**updated_task_data) # Create new validated model instance # Store the updated task back tasks_db[task_id] = updated_task return updated_task # Add DELETE endpoint for deleting tasks @app.delete("/tasks/{task_id}", status_code=204) # 204 No Content is typical for successful DELETE async def delete_task(task_id: int): """ Delete a task by ID. Returns 404 if the task is not found. Returns 204 No Content on successful deletion. """ if task_id not in tasks_db: raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found for deletion") del tasks_db[task_id] # No response body needed for 204, so return None or nothing return None # Or use: return Response(status_code=status.HTTP_204_NO_CONTENT) after importing Response and status # Add the root endpoint if you removed it @app.get("/") async def read_root(): return {"message": "Welcome to the Enhanced Task Management API"}
task_to_update.title = new_title
might work if the object allows it, but Pydantic models are often treated as immutable. A safer approach is creating a new model instance with the updated data, as shown above (updated_task = Task(**updated_task_data)
).exclude_unset=True
intask_in.dict(exclude_unset=True)
is crucial for partial updates – it only includes fields that were actually provided in the request body. Also, ensure all necessary imports (date
,Field
) are present. Addedstatus_code=204
to DELETE andstatus_code=201
back to POST for clarity. ReturningNone
from aDELETE
endpoint withstatus_code=204
is idiomatic in FastAPI. -
Run the Server:
-
Test Validation (Swagger UI /
curl
):- Go to
http://127.0.0.1:8000/docs
. - Test POST /tasks validation:
- Try creating a task with a title that is too short (e.g.,
"t"
). Execute. Observe the422 Unprocessable Entity
error detailing themin_length
violation. - Try creating a task with an invalid date format in
due_date
(e.g.,"31-12-2024"
instead of"2024-12-31"
). Observe the 422 error. - Create a valid task: Note its ID (e.g., ID might be 1 if you restarted the server).
- Try creating a task with a title that is too short (e.g.,
- Test PUT /tasks/{task_id}:
- Expand
PUT /tasks/{task_id}
. Try it out. - Enter the ID of the task you just created.
- Provide a request body to update only the description and completed status:
- Execute. Observe the updated task in the response.
- Verify with
GET /tasks/{task_id}
that the changes persisted (in memory). - Try updating a non-existent task ID (e.g., 999). Observe the
404 Not Found
error. - Try updating with invalid data (e.g., title too long). Observe the
422 Unprocessable Entity
error.
- Expand
- Test DELETE /tasks/{task_id}:
- Expand
DELETE /tasks/{task_id}
. Try it out. - Enter the ID of the task you created/updated.
- Execute. Observe the
204 No Content
response code (Swagger UI might show "No response body"). - Try getting the deleted task ID using
GET /tasks/{task_id}
. Observe the404 Not Found
error. - Try deleting a non-existent task ID. Observe the
404 Not Found
error.
- Expand
- Go to
-
Test with
curl
(Optional but Recommended):- Update task 1 (replace
<TASK_ID>
with actual ID): - Attempt to update non-existent task 99:
- Delete task 1:
- Attempt to delete non-existent task 99:
- Update task 1 (replace
You've now enhanced the API with update and delete capabilities, added more sophisticated data validation using Pydantic Field
, and ensured that appropriate HTTP errors are returned for common scenarios like "not found".
3. Structuring FastAPI Applications
As your API grows, putting all the code into a single main.py
file becomes unmanageable. Maintaining code, collaborating with others, and navigating the project become difficult. FastAPI provides tools and encourages patterns to structure larger applications effectively, primarily through APIRouter
and Dependency Injection.
The Need for Structure
A well-structured application offers several advantages:
- Maintainability: Easier to find, understand, and modify specific parts of the codebase.
- Reusability: Components like database connections or authentication logic can be reused across different parts of the API.
- Scalability: Easier to add new features or endpoints without disrupting existing functionality.
- Testability: Smaller, focused modules are easier to test in isolation.
- Collaboration: Different team members can work on different parts (routers, models) simultaneously with fewer conflicts.
A common approach involves separating concerns into different directories and modules:
main.py
: The main application entry point, responsible for creating theFastAPI
instance and including routers.routers/
: Contains modules defining related groups of endpoints usingAPIRouter
. (e.g.,routers/tasks.py
,routers/users.py
).schemas/
(ormodels/
orpydantic_models/
): Contains Pydantic models defining data shapes for request bodies and responses. (Sometimes split:schemas
for API data shapes,models
for database ORM models).crud/
orservices/
orlogic/
: Contains functions that encapsulate the business logic and data manipulation (e.g., interacting with the database). This keeps the router functions thin.db/
ordatabase/
: Contains database connection setup, session management, and potentially ORM model definitions (if not in a separatemodels/
directory).core/
orconfig/
: For application configuration, settings management.dependencies/
: For reusable dependency functions (authentication, database sessions, etc.).tests/
: Contains application tests.
Using APIRouter
APIRouter
works like a "mini" FastAPI
application. You can define path operations, parameters, responses, dependencies, etc., on an APIRouter
instance within a separate module. Then, you "include" this router in your main FastAPI
application.
This allows you to group related endpoints together. For example, all task-related endpoints (/tasks
, /tasks/{task_id}
) can live in routers/tasks.py
.
Example:
-
Create
routers/tasks.py
:Self-correction: Relative imports (# In routers/tasks.py from fastapi import APIRouter, HTTPException, Depends from typing import List # Assume Pydantic models (Task, TaskCreate, TaskUpdate) are defined elsewhere # e.g., in schemas/tasks.py from ..schemas.tasks import Task, TaskCreate, TaskUpdate # Assume some data storage mechanism exists (can be passed via dependency later) from .. import crud # Assuming crud functions are in crud.py or similar # Create an APIRouter instance router = APIRouter( prefix="/tasks", # All paths in this router will start with /tasks tags=["Tasks"], # Group endpoints in the OpenAPI docs under "Tasks" responses={404: {"description": "Task not found"}}, # Default response for this router ) # In-memory DB for this example (will be replaced) tasks_db_router = {} next_task_id_router = 1 # Define path operations using the router instance @router.post("/", response_model=Task, status_code=201) # Path is relative to prefix: /tasks/ async def create_task_route(task_in: TaskCreate): # In a real app, call a CRUD function: # return crud.create_task(db_session=db, task=task_in) global next_task_id_router new_task_id = next_task_id_router new_task = Task(**task_in.dict(), id=new_task_id, completed=False) tasks_db_router[new_task_id] = new_task next_task_id_router += 1 return new_task @router.get("/", response_model=List[Task]) async def read_tasks_route(skip: int = 0, limit: int = 10): # return crud.get_tasks(db_session=db, skip=skip, limit=limit) all_tasks = list(tasks_db_router.values()) return all_tasks[skip : skip + limit] @router.get("/{task_id}", response_model=Task) async def read_task_route(task_id: int): # task = crud.get_task(db_session=db, task_id=task_id) task = tasks_db_router.get(task_id) if task is None: raise HTTPException(status_code=404, detail=f"Task with ID {task_id} not found") return task @router.put("/{task_id}", response_model=Task) async def update_task_route(task_id: int, task_in: TaskUpdate): # return crud.update_task(db_session=db, task_id=task_id, task_update=task_in) task_to_update = tasks_db_router.get(task_id) if task_to_update is None: raise HTTPException(status_code=404) # Uses default 404 response defined in APIRouter update_data = task_in.dict(exclude_unset=True) updated_task_data = task_to_update.dict() updated_task_data.update(update_data) updated_task = Task(**updated_task_data) tasks_db_router[task_id] = updated_task return updated_task @router.delete("/{task_id}", status_code=204) async def delete_task_route(task_id: int): # result = crud.delete_task(db_session=db, task_id=task_id) # if not result: raise HTTPException(...) if task_id not in tasks_db_router: raise HTTPException(status_code=404) del tasks_db_router[task_id] return None # Note: Relative imports like 'from ..schemas.tasks import Task' assume a package structure. # Running 'uvicorn main:app' from the project root usually requires the code # to be structured as a Python package (with __init__.py files).
from ..schemas.tasks
) require the project to be structured as a Python package. This means having empty__init__.py
files in the directories (task_manager_api/
,task_manager_api/routers/
,task_manager_api/schemas/
) so Python recognizes them as packages. When running Uvicorn, you might need to adjust the import path or run Python using the-m
flag (e.g.,python -m uvicorn app.main:app --reload
if your main code is inapp/main.py
). Also, note the path for@router.post("/")
becomes/tasks/
because of theprefix
. -
Include the router in
main.py
:# In main.py (or app/main.py if structured) from fastapi import FastAPI # Assuming routers are in a 'routers' sub-directory/package from .routers import tasks # Or: from routers import tasks app = FastAPI( title="Structured Task Management API", description="Refactored API using APIRouter.", version="0.3.0", ) # Include the task router app.include_router(tasks.router) # You can include other routers here (e.g., users, items) # from .routers import users # app.include_router(users.router) # Add root endpoint directly to the main app if needed @app.get("/") async def read_root(): return {"message": "Welcome to the Structured Task Management API"} # Pydantic models, DB connections, etc., would typically be imported # into the router files or passed via dependencies, not defined here.
Now, the task-specific logic is neatly contained within routers/tasks.py
.
Dependency Injection (Depends
)
Dependency Injection is a powerful pattern used extensively in FastAPI. It allows you to declare dependencies (like a database session, the current user, configuration settings, or even complex computations) that your path operation functions need. FastAPI takes care of executing these dependencies and injecting their results into your function's parameters.
-
How it Works:
- You write a function (the "dependency" function) that performs some setup, yields a value, and potentially performs some teardown. Common dependencies return database sessions, check authentication tokens, or load configurations.
- In your path operation function signature, you declare a parameter with a default value of
Depends(your_dependency_function)
. - When a request comes in, FastAPI calls
your_dependency_function
. - The value returned (or yielded) by the dependency function is passed as an argument to your path operation function.
- If the dependency function uses
yield
, the code afteryield
is executed after the response has been sent (useful for cleanup like closing database connections).
-
Benefits:
- Reusability: Write the logic for getting a database session or authenticating a user once and reuse it across many endpoints.
- Decoupling: Path operation functions don't need to know how to get a database session or user; they just declare that they need one.
- Testability: Dependencies can be easily overridden during testing (e.g., providing a mock database session).
- Organization: Keeps setup/teardown logic separate from the core endpoint logic.
-
Example (Conceptual Database Dependency):
# In a file like dependencies.py or db/session.py from typing import Generator # Or AsyncGenerator for async dependencies # Placeholder for actual DB session logic def get_fake_db_session(): print("DEBUG: Getting fake DB session") db_session = {"type": "fake", "data": {}} # Simulate a session object try: yield db_session # Provide the session to the endpoint finally: # This code runs after the response is sent print("DEBUG: Closing fake DB session") # In routers/tasks.py from fastapi import Depends, APIRouter # ... other imports ... # from ..dependencies import get_fake_db_session router = APIRouter(prefix="/tasks", tags=["Tasks"]) @router.get("/") async def read_tasks_with_db( skip: int = 0, limit: int = 10, # Declare the dependency: FastAPI will call get_fake_db_session # and pass its yielded value to the 'db' parameter. db: dict = Depends(get_fake_db_session) ): print(f"DEBUG: Inside read_tasks_with_db, received db: {db}") # Use the 'db' session here to fetch tasks # tasks = fetch_tasks_from_db(db, skip, limit) return [{"id": 1, "title": "Task fetched using fake DB"}, {"id": 2, "title": "Another task"}] # Placeholder return # Other endpoints would also use Depends(get_fake_db_session)
When you call the /tasks/
endpoint now, you'll see the "Getting fake DB session" and "Closing fake DB session" messages printed in the Uvicorn console, demonstrating the setup/teardown cycle managed by FastAPI. The db
parameter inside read_tasks_with_db
will receive the dictionary yielded by get_fake_db_session
.
We will implement actual database dependencies in the "Databases and ORMs" section.
Workshop Refactoring the Task Management API for Better Structure
Project: Restructure the Task Management API code using APIRouter
and prepare for dependency injection.
Goal: Move task endpoints to a dedicated router file. Organize files into a conventional structure. Introduce a placeholder dependency.
Steps:
-
Create Directory Structure: Organize your project like this:
Create the directories (task_manager_api/ ├── venv/ # Your virtual environment ├── app/ # Main application package directory │ ├── __init__.py # Makes 'app' a package │ ├── main.py # Main FastAPI app instance, includes routers │ ├── routers/ │ │ ├── __init__.py # Makes 'routers' a package │ │ └── tasks.py # Task-related endpoints (APIRouter) │ ├── schemas/ │ │ ├── __init__.py # Makes 'schemas' a package │ │ └── tasks.py # Pydantic models for tasks │ ├── dependencies.py # Placeholder for dependencies │ └── crud.py # Placeholder for CRUD logic (optional for now) └── requirements.txt # Will add later
app
,app/routers
,app/schemas
) and the empty__init__.py
files. -
Move Pydantic Models (
app/schemas/tasks.py
): Cut the Pydantic model definitions (TaskBase
,TaskCreate
,TaskUpdate
,Task
) from your previousmain.py
and paste them into the new fileapp/schemas/tasks.py
. Add necessary imports at the top.# In app/schemas/tasks.py from pydantic import BaseModel, Field from typing import Optional, List from datetime import date # --- Pydantic Models --- class TaskBase(BaseModel): title: str = Field(..., min_length=3, max_length=50, description="The title of the task (must be between 3 and 50 chars)") description: Optional[str] = Field(None, max_length=500, description="Optional description (max 500 chars)") due_date: Optional[date] = Field(None, description="Optional due date in YYYY-MM-DD format") class TaskCreate(TaskBase): pass class TaskUpdate(TaskBase): title: Optional[str] = Field(None, min_length=3, max_length=50) description: Optional[str] = Field(None, max_length=500) due_date: Optional[date] = None completed: Optional[bool] = None class Task(TaskBase): id: int completed: bool = False class Config: orm_mode = True schema_extra = { "example": { "id": 1, "title": "Example Task", "description": "This is an example task description.", "due_date": "2024-12-31", "completed": False, } }
-
Create Placeholder Dependency (
app/dependencies.py
): Add the simpleget_fake_db_session
function.Self-correction: Added type hint# In app/dependencies.py from typing import Generator # Placeholder for actual DB session logic # In a real app, this would handle database connection pooling and sessions def get_fake_db_session() -> Generator[dict, None, None]: print("DEBUG: Getting fake DB session") # Simulate a session or connection object/dictionary # Later this will yield a SQLAlchemy SessionLocal() instance db_session = {"type": "fake", "connection_status": "connected", "data_store": {}} try: yield db_session # Provide the session to the endpoint finally: # This code runs after the response is sent (e.g., close connection/session) print(f"DEBUG: Closing fake DB session. Final state: {db_session}") # In a real DB session: db_session.close()
-> Generator[dict, None, None]
for clarity. -
Create Task Router (
app/routers/tasks.py
): Move the task-related path operation functions (POST, GET all, GET one, PUT, DELETE) into this file. UseAPIRouter
. Import models fromapp.schemas.tasks
and the dependency fromapp.dependencies
. Remove the global in-memory storage and use the fake dependency instead (we won't implement the full logic with the fake DB yet, just inject it).Self-correction: Used relative imports (# In app/routers/tasks.py from fastapi import APIRouter, HTTPException, Depends, status # Import status for codes from typing import List # Use relative imports within the 'app' package from ..schemas.tasks import Task, TaskCreate, TaskUpdate from ..dependencies import get_fake_db_session # Create an APIRouter instance router = APIRouter( prefix="/tasks", # All paths in this router will start with /tasks tags=["Tasks"], # Group endpoints in the OpenAPI docs under "Tasks" responses={404: {"description": "Task not found"}}, # Default response for this router ) # We will no longer use global variables for storage here. # Logic will depend on the injected 'db' session. # For now, we just demonstrate injection and return placeholder data. @router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED) async def create_task_route( task_in: TaskCreate, db: dict = Depends(get_fake_db_session) # Inject fake DB ): """ Create a new task (placeholder implementation). """ print(f"DEBUG: Received task to create: {task_in.dict()}") print(f"DEBUG: Using DB session: {db}") # In real app: Call crud.create_task(db=db, task=task_in) # Placeholder: Simulate creation and return new_task_id = len(db.get("data_store", {})) + 1 # Simulate getting next ID new_task = Task(**task_in.dict(), id=new_task_id, completed=False) db["data_store"][new_task_id] = new_task # Simulate storing in the fake DB session print(f"DEBUG: Simulated storing task {new_task_id}") return new_task @router.get("/", response_model=List[Task]) async def read_tasks_route( skip: int = 0, limit: int = 10, db: dict = Depends(get_fake_db_session) # Inject fake DB ): """ Retrieve a list of tasks (placeholder implementation). """ print(f"DEBUG: Fetching tasks with skip={skip}, limit={limit}") print(f"DEBUG: Using DB session: {db}") # In real app: Call crud.get_tasks(db=db, skip=skip, limit=limit) # Placeholder: Return data from fake DB session all_tasks = list(db.get("data_store", {}).values()) return all_tasks[skip : skip + limit] @router.get("/{task_id}", response_model=Task) async def read_task_route( task_id: int, db: dict = Depends(get_fake_db_session) # Inject fake DB ): """ Retrieve a single task by ID (placeholder implementation). """ print(f"DEBUG: Fetching task with ID: {task_id}") print(f"DEBUG: Using DB session: {db}") # In real app: Call crud.get_task(db=db, task_id=task_id) # Placeholder: Get from fake DB session task = db.get("data_store", {}).get(task_id) if task is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with ID {task_id} not found") return task @router.put("/{task_id}", response_model=Task) async def update_task_route( task_id: int, task_in: TaskUpdate, db: dict = Depends(get_fake_db_session) # Inject fake DB ): """ Update an existing task by ID (placeholder implementation). """ print(f"DEBUG: Updating task ID: {task_id} with data: {task_in.dict(exclude_unset=True)}") print(f"DEBUG: Using DB session: {db}") # In real app: Call crud.update_task(db=db, task_id=task_id, task_update=task_in) # Placeholder: Update in fake DB session task_to_update = db.get("data_store", {}).get(task_id) if task_to_update is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Task not found") update_data = task_in.dict(exclude_unset=True) # Create a new updated model instance (as Pydantic models are often immutable) updated_task_data = task_to_update.dict() updated_task_data.update(update_data) updated_task = Task(**updated_task_data) db["data_store"][task_id] = updated_task # Simulate update print(f"DEBUG: Simulated updating task {task_id}") return updated_task @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task_route( task_id: int, db: dict = Depends(get_fake_db_session) # Inject fake DB ): """ Delete a task by ID (placeholder implementation). """ print(f"DEBUG: Deleting task ID: {task_id}") print(f"DEBUG: Using DB session: {db}") # In real app: Call crud.delete_task(db=db, task_id=task_id) # Placeholder: Delete from fake DB session if task_id not in db.get("data_store", {}): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Task not found") del db["data_store"][task_id] # Simulate deletion print(f"DEBUG: Simulated deleting task {task_id}") return None # Return None for 204 No Content
from ..schemas.tasks
). Importedstatus
for semantic status codes. Updated the placeholder logic to actually use the injecteddb
dictionary to simulate storage across calls within the same request's dependency scope, although it won't persist between requests yet. Removed the global variables for storage. Updated status codes usingstatus
. -
Update Main Application File (
app/main.py
): Modify the main file to import and include the task router. Keep it clean, focusing on app creation and router inclusion.Self-correction: Changed# In app/main.py from fastapi import FastAPI # Use relative import for routers within the same package from .routers import tasks # Create the main FastAPI application instance app = FastAPI( title="Structured Task Management API", description="Refactored API using APIRouter and basic Dependency Injection.", version="0.3.0", # You can add other metadata like contact info, license, etc. # openapi_tags=... # Define tags used by routers here for ordering/description ) # Include routers from the 'routers' module # The prefix="/tasks" and tags=["Tasks"] are defined within tasks.router app.include_router(tasks.router) # Example: You could add another router like this if you had one # from .routers import users # app.include_router(users.router, prefix="/users", tags=["Users"]) # Root endpoint for basic connectivity check @app.get("/") async def read_root(): """ Provides a welcome message and basic API status. """ return {"message": "Welcome to the Task Management API!"} # Note: Pydantic models, CRUD functions, dependencies (except the main app) # are now in their respective modules, keeping main.py clean.
from .routers import tasks
to reflect the package structure. -
Run the Refactored App: Now, you need to tell Uvicorn where your
FastAPI
app instance is located using the new path. Make sure you are in the root directory (task_manager_api/
, the one containingapp/
).(Using# Ensure your virtual environment is active: source venv/bin/activate # Run uvicorn targeting the 'app' object inside 'app/main.py' uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
--host 0.0.0.0
makes it accessible from other devices on your network, useful if testing from a different machine or container. Keep using127.0.0.1
if preferred.) -
Test the Refactored API:
- Open
http://127.0.0.1:8000/docs
(or your server's IP if using0.0.0.0
). - Verify that all task endpoints are listed under the "Tasks" tag.
- Test creating, reading, updating, and deleting tasks as before.
- Observe the
DEBUG
messages printed in the Uvicorn console output. Notice how "Getting fake DB session" and "Closing fake DB session" wrap each API call that uses the dependency. Notice how the fake "data_store" resets for each request because the dependency creates a new dictionary each time it's called. This highlights why we need a real database connection managed properly later.
- Open
You have successfully refactored the API into a more organized structure using APIRouter
, separated concerns into different files (schemas, routers, dependencies), and introduced the concept of dependency injection, paving the way for more complex features like database integration and authentication.
4. Asynchronous Operations
Modern web applications, especially APIs, often need to perform operations that involve waiting – waiting for database queries, network requests to other services, or reading/writing files. Traditional synchronous code handles these waits by blocking the entire execution thread, meaning the server cannot process other requests until the wait is over. Asynchronous programming, particularly with Python's async
/await
features, provides a solution to handle I/O-bound tasks efficiently, allowing a single process to manage many concurrent connections. FastAPI is built from the ground up with async capabilities, making it highly performant for these scenarios.
Synchronous vs Asynchronous Programming
-
Synchronous (Sync): Code executes step-by-step. When an operation needs to wait (e.g., fetching data from a database), the entire thread pauses (blocks) until the operation completes. In a web server context, a blocked thread cannot handle any other incoming requests. To handle concurrency, traditional sync servers often use multiple threads or processes, which can consume significant memory and CPU resources.
Here,# Synchronous Example import time def do_sync_work(item_id): print(f"Starting sync work for {item_id}") # Simulate I/O wait (e.g., database query) time.sleep(1) # Blocks execution for 1 second print(f"Finished sync work for {item_id}") return f"Result for {item_id}" start = time.time() result1 = do_sync_work(1) result2 = do_sync_work(2) end = time.time() print(f"Total sync time: {end - start:.2f} seconds") # Output: ~2.00 seconds
do_sync_work(2)
only starts afterdo_sync_work(1)
is completely finished. -
Asynchronous (Async): Code execution can be paused during waiting operations without blocking the entire thread. When an async function encounters an
await
call for an I/O operation, it yields control back to the event loop. The event loop can then run other tasks (like handling another incoming request or continuing another paused async function). When the awaited I/O operation completes, the event loop schedules the paused async function to resume execution from where it left off. This allows a single thread to handle many concurrent I/O-bound tasks efficiently.Here,# Asynchronous Example (requires asyncio) import asyncio import time async def do_async_work(item_id): print(f"Starting async work for {item_id}") # Simulate async I/O wait (e.g., async DB query, httpx request) await asyncio.sleep(1) # Non-blocking sleep print(f"Finished async work for {item_id}") return f"Result for {item_id}" async def main(): start = time.time() # Schedule both tasks to run concurrently task1 = asyncio.create_task(do_async_work(1)) task2 = asyncio.create_task(do_async_work(2)) # Wait for both tasks to complete result1 = await task1 result2 = await task2 end = time.time() print(f"Total async time: {end - start:.2f} seconds") # Output: ~1.00 second # Run the main async function using the event loop asyncio.run(main())
do_async_work(1)
anddo_async_work(2)
run concurrently. Theawait asyncio.sleep(1)
pauses each task individually, but the event loop can switch between them (or handle other work), so the total time is roughly the duration of the longest single task.
Python's async
and await
async def
: Used to define a coroutine function (the basis of async operations). Anasync def
function, when called, returns a coroutine object, not the result directly.await
: Used inside anasync def
function to pause its execution and wait for an awaitable object (like another coroutine or a task) to complete. Theawait
keyword can only be used inside anasync def
function.- Event Loop: The core of asyncio. It manages and distributes the execution of different asynchronous tasks. When a task awaits an operation, the event loop takes control and runs other ready tasks.
Why Async is Crucial for APIs
Web APIs are inherently I/O-bound. Most requests involve:
- Reading data from a network socket (the incoming request).
- Querying a database over the network.
- Calling other external APIs over the network.
- Reading/writing files from disk.
- Writing data back to the network socket (the response).
Using synchronous code means your server process can get stuck waiting for these operations, limiting the number of concurrent users it can handle effectively. By using async def
for your path operation functions in FastAPI, you leverage the underlying ASGI server (like Uvicorn) and its event loop to handle many requests concurrently with fewer resources.
Using async def
in FastAPI
FastAPI fully supports async def
for path operation functions. If your endpoint performs I/O (database calls, external API requests), you should declare it as async def
and use await
when calling async libraries.
from fastapi import FastAPI
import asyncio
import httpx # An async-capable HTTP client library
app = FastAPI()
# Using an async path operation function
@app.get("/fetch-external-data")
async def fetch_data():
# Use an async HTTP client
async with httpx.AsyncClient() as client:
print("Making external API call...")
# await pauses this function, allowing server to handle other requests
response = await client.get("https://httpbin.org/delay/1") # Simulates a 1-second delay
print("External call completed.")
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail="Failed to fetch data")
# You can mix async and sync endpoints
@app.get("/sync-endpoint")
def sync_operation():
# This runs synchronously in a thread pool managed by FastAPI/Starlette
# if called by an async server. Good for CPU-bound work or sync libraries.
print("Executing synchronous code")
time.sleep(0.5) # This will block the worker thread
return {"message": "Sync operation complete"}
Important: If you define a path operation function with normal def
instead of async def
, FastAPI is smart enough to run it in an external thread pool when running under an async server like Uvicorn. This prevents it from blocking the main event loop. However, for code that is inherently I/O-bound and has async library support (like databases or HTTP clients), using async def
and await
is generally more efficient. Use standard def
primarily for CPU-bound work or when interacting with libraries that only offer synchronous interfaces.
Integrating with Async Libraries
To get the full benefit of async def
in FastAPI, you need to use libraries that also support asynchronous operations for I/O tasks.
- HTTP Clients:
httpx
is a popular choice that provides both sync and async APIs. - Databases:
- PostgreSQL:
asyncpg
is a high-performance driver.psycopg
(version 3+) also offers async support. - MySQL:
aiomysql
- SQLite:
aiosqlite
- ORM / Database Toolkits:
- SQLAlchemy (version 1.4+ has async support, significantly enhanced in 2.0) using async drivers.
databases
: A library providing simple async database interaction over SQLAlchemy Core.Tortoise ORM
: An asyncio ORM inspired by Django's ORM.SQLModel
: Built by the creator of FastAPI, combines Pydantic and SQLAlchemy, includes async support.
- PostgreSQL:
Workshop Making API Calls Asynchronously
Project: Enhance the Task Management API by adding a new endpoint that, when retrieving a specific task, also asynchronously fetches some related dummy data from an external (mock) service using httpx
.
Goal: Learn to use async def
for path operations and make non-blocking external HTTP requests using httpx
.
Steps:
-
Ensure Setup: Be in the
task_manager_api
root directory with the virtual environment activated. Your code should be structured as per the previous workshop (app/main.py
,app/routers/tasks.py
, etc.). -
Install
Addhttpx
:httpx
to yourrequirements.txt
file if you are maintaining one. -
Modify Task Router (
app/routers/tasks.py
): Change theread_task_route
function to beasync def
and add logic to call an external service. We'll usehttps://httpbin.org/get
which simply echoes back request info, simulating fetching related data.Self-correction: Ensured all path operation functions in the router are now# In app/routers/tasks.py # ... (keep existing imports: APIRouter, HTTPException, Depends, List, status) from ..schemas.tasks import Task, TaskCreate, TaskUpdate # Keep schema imports from ..dependencies import get_fake_db_session # Keep dependency import import httpx # Import httpx import asyncio # Import asyncio if needed for gather later # Keep the router definition router = APIRouter( prefix="/tasks", tags=["Tasks"], responses={404: {"description": "Task not found"}}, ) # Keep create_task_route, read_tasks_route, update_task_route, delete_task_route # Modify only read_task_route # Previous (sync placeholder) read_task_route: # @router.get("/{task_id}", response_model=Task) # async def read_task_route( ... ): # Make sure it was already async or change it now @router.get("/{task_id}", response_model=Task) async def read_task_route( # Ensure function is async def task_id: int, db: dict = Depends(get_fake_db_session) ): """ Retrieve a single task by ID. Also asynchronously fetches related dummy data from an external service. """ print(f"DEBUG: Fetching task with ID: {task_id}") print(f"DEBUG: Using DB session: {db}") # --- Fetch Task from 'DB' (Placeholder) --- task = db.get("data_store", {}).get(task_id) if task is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with ID {task_id} not found") # --- Asynchronously Fetch External Data --- # Create an async HTTP client session async with httpx.AsyncClient() as client: try: # Make an async GET request # We use httpbin.org/get which returns JSON about the request # Let's simulate fetching 'user' details related to the task external_url = f"https://httpbin.org/get?task_id={task_id}" print(f"DEBUG: Calling external service: {external_url}") response = await client.get(external_url, timeout=5.0) # Add a timeout response.raise_for_status() # Raise an exception for 4xx/5xx responses external_data = response.json() print(f"DEBUG: Received external data: {external_data.get('args')}") # httpbin echoes params in 'args' # You could add this external data to the response if needed, # but for this example, we'll just print it. # If you wanted to return it, you'd modify the response_model=Task # or create a new combined model. Let's keep it simple for now. except httpx.RequestError as exc: # Handle potential errors during the HTTP request print(f"ERROR: HTTP request failed: {exc}") # Decide how to handle this: return task without external data, or raise 50x error? # For now, just log it and return the task data. # raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, # detail=f"External service request failed: {exc}") # --- Return Task --- # The task data is returned as before return task # Optional: Example demonstrating multiple concurrent async calls @router.get("/fetch-multiple/") async def fetch_multiple_urls(): """ Demonstrates making multiple async HTTP calls concurrently using asyncio.gather. """ urls = [ "https://httpbin.org/delay/1", # 1 second delay "https://httpbin.org/delay/2", # 2 second delay "https://httpbin.org/delay/1", # 1 second delay ] async with httpx.AsyncClient() as client: # Create a list of coroutine objects (client.get returns a coroutine) tasks = [client.get(url, timeout=10.0) for url in urls] print(f"DEBUG: Dispatching {len(tasks)} async requests...") # Run tasks concurrently and wait for all to complete results = await asyncio.gather(*tasks, return_exceptions=True) print("DEBUG: All async requests completed.") processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): processed_results.append({"url": urls[i], "status": "error", "detail": str(result)}) print(f"ERROR: Request to {urls[i]} failed: {result}") elif isinstance(result, httpx.Response): processed_results.append({"url": urls[i], "status": result.status_code}) print(f"INFO: Request to {urls[i]} succeeded with status {result.status_code}") result.raise_for_status() # Optional: check status again return {"message": "Fetched multiple URLs concurrently", "results": processed_results} # Keep other routes (create, read_all, update, delete) as they were, # ensuring they also use async def if they perform I/O (even fake I/O). # If they only manipulate the fake db dict, they could be sync, but # consistency (using async def for all) is often simpler. # Let's ensure all routes are async for consistency now. # Modify create_task_route to be async (if not already) @router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED) async def create_task_route( # Ensure async def task_in: TaskCreate, db: dict = Depends(get_fake_db_session) ): # ... (implementation remains the same) ... print(f"DEBUG: Received task to create: {task_in.dict()}") new_task_id = len(db.get("data_store", {})) + 1 new_task = Task(**task_in.dict(), id=new_task_id, completed=False) db["data_store"][new_task_id] = new_task print(f"DEBUG: Simulated storing task {new_task_id}") await asyncio.sleep(0.01) # Simulate tiny async I/O if needed return new_task # Modify read_tasks_route to be async (if not already) @router.get("/", response_model=List[Task]) async def read_tasks_route( # Ensure async def skip: int = 0, limit: int = 10, db: dict = Depends(get_fake_db_session) ): # ... (implementation remains the same) ... print(f"DEBUG: Fetching tasks with skip={skip}, limit={limit}") all_tasks = list(db.get("data_store", {}).values()) await asyncio.sleep(0.01) # Simulate tiny async I/O if needed return all_tasks[skip : skip + limit] # Modify update_task_route to be async (if not already) @router.put("/{task_id}", response_model=Task) async def update_task_route( # Ensure async def task_id: int, task_in: TaskUpdate, db: dict = Depends(get_fake_db_session) ): # ... (implementation remains the same) ... print(f"DEBUG: Updating task ID: {task_id}") task_to_update = db.get("data_store", {}).get(task_id) if task_to_update is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Task not found") update_data = task_in.dict(exclude_unset=True) updated_task_data = task_to_update.dict() updated_task_data.update(update_data) updated_task = Task(**updated_task_data) db["data_store"][task_id] = updated_task print(f"DEBUG: Simulated updating task {task_id}") await asyncio.sleep(0.01) # Simulate tiny async I/O if needed return updated_task # Modify delete_task_route to be async (if not already) @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task_route( # Ensure async def task_id: int, db: dict = Depends(get_fake_db_session) ): # ... (implementation remains the same) ... print(f"DEBUG: Deleting task ID: {task_id}") if task_id not in db.get("data_store", {}): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Task not found") del db["data_store"][task_id] print(f"DEBUG: Simulated deleting task {task_id}") await asyncio.sleep(0.01) # Simulate tiny async I/O if needed return None
async def
. Addedasyncio.sleep(0.01)
to simulate some async work even in the non-HTTP functions, promoting consistency. Added error handling for thehttpx
call and a timeout. Added the/fetch-multiple/
example usingasyncio.gather
. Made sure theread_task_route
correctly fetches the task first before making the external call. -
Run the Server:
-
Test the Asynchronous Endpoint:
- First, create a task using
POST /tasks
via Swagger UI (http://127.0.0.1:8000/docs
) orcurl
. Note its ID (e.g., 1). - Now, test the modified
GET /tasks/{task_id}
endpoint for the task you just created. - Using Swagger UI:
- Expand
GET /tasks/{task_id}
. - Enter the task ID.
- Click "Execute".
- Expand
- Using
curl
: - Observe:
- You should receive the task details in the response as before.
- Check the Uvicorn console output. You should see the
DEBUG
messages, including "Calling external service...", the URL called, and the "Received external data..." message showing the echoed arguments from httpbin.org. Crucially, while theawait client.get(...)
was happening, the server's event loop was free to handle other requests if any had arrived concurrently.
- Test
GET /tasks/fetch-multiple/
: Observe the output. Thetime
command will show the total execution time. It should be slightly over 2 seconds (the duration of the longest delay), not 1 + 2 + 1 = 4 seconds, demonstrating the concurrency achieved byasyncio.gather
. Check the Uvicorn logs for the debug messages showing dispatch and completion.
- First, create a task using
You have now successfully integrated asynchronous I/O into your FastAPI application using async def
and httpx
. This is essential for building responsive and scalable APIs that interact with external resources or databases efficiently.
5. Authentication and Authorization
Most real-world APIs need to control who can access them and what actions different users are allowed to perform. This involves two distinct concepts:
- Authentication: Verifying the identity of a user or client. "Who are you?"
- Authorization: Determining if the authenticated user/client has permission to perform the requested action or access the requested resource. "What are you allowed to do?"
FastAPI provides robust tools, particularly leveraging dependency injection (Depends
) and its security utilities (fastapi.security
), to implement various authentication and authorization schemes. We will focus on OAuth2 with JWT Bearer tokens, a common and secure standard for modern APIs.
Authentication vs Authorization
It's crucial to understand the difference:
- Authentication happens first. The client presents credentials (like a username/password, an API key, or a token). The server validates these credentials. If valid, the client is considered authenticated.
- Authorization happens after successful authentication. The server checks if the authenticated user has the necessary permissions (e.g., roles, scopes) to perform the specific operation (e.g., read data, create a task, delete a user).
Common Authentication Schemes
- HTTP Basic Auth: Client sends username/password encoded in the
Authorization
header. Simple but sends credentials with every request (usually over HTTPS). Less common for user-facing APIs, sometimes used for service-to-service communication. - API Keys: Client sends a pre-shared secret key, typically in a custom header (
X-API-Key
) or as a query parameter. Good for server-to-server or third-party integrations. Requires careful key management. - OAuth2: An authorization framework (not a specific protocol) that enables applications to obtain limited access to user accounts on an HTTP service. It defines various "flows" or "grant types". Commonly used flows for API authentication include:
- Password Flow (
password
grant type): The client (e.g., your own web/mobile app) directly exchanges the user's username and password for an access token (typically a JWT). This flow is simple but requires trusting the client application with user credentials. - Authorization Code Flow: More secure flow for third-party applications. User is redirected to the authorization server to log in, grants permission, and the server issues an authorization code back to the client. The client then exchanges this code for an access token.
- Client Credentials Flow: Used for machine-to-machine authentication where no user is involved. The client authenticates using its own credentials (client ID/secret) to get a token.
- Password Flow (
- JWT (JSON Web Tokens): A compact, URL-safe means of representing claims to be transferred between two parties. Often used as the format for OAuth2 access tokens. JWTs are digitally signed (e.g., using HMAC or RSA/ECDSA) ensuring their integrity and authenticity. They typically contain information (claims) about the user (e.g., user ID, username, roles, expiration time) directly within the token, allowing for stateless authentication (the server doesn't need to store session state).
Implementing OAuth2 Password Flow with JWT in FastAPI
We'll implement the common scenario where our API authenticates users based on username/password and issues JWT tokens. Clients then include this token in the Authorization: Bearer <token>
header for subsequent requests.
Key Components:
- Password Hashing: NEVER store passwords in plain text. Use a strong, salted hashing algorithm like Bcrypt or Argon2. The
passlib
library is excellent for this. - Pydantic Models: For user data, token data, and the token request form.
- Security Scheme Definition: Use
fastapi.security.OAuth2PasswordBearer
to tell FastAPI where to look for the token (in theAuthorization
header). - Token Endpoint (
/token
): An endpoint where clients POST username/password to receive a JWT. Usesfastapi.security.OAuth2PasswordRequestForm
. - JWT Utility Functions: Functions to create (encode) and decode/validate JWTs. The
python-jose
library is commonly used. - Dependency for Current User: A dependency function (e.g.,
get_current_user
) that:- Takes the token string provided by
OAuth2PasswordBearer
. - Decodes and validates the JWT (checks signature, expiration).
- Extracts user information (e.g., username or user ID) from the token payload.
- (Optionally) Fetches the corresponding user object from the database.
- Returns the user object or raises
HTTPException(status_code=401)
if the token is invalid, expired, or missing.
- Takes the token string provided by
- Protecting Endpoints: Add
Depends(get_current_user)
to the signature of path operation functions that require authentication. The returned user object can then be used for authorization checks or associating data with the user.
JWT Structure and Security Considerations
A JWT typically consists of three parts separated by dots (.
): header.payload.signature
.
- Header: Contains metadata about the token (e.g.,
alg
: signing algorithm,typ
: token type). Base64Url encoded. - Payload: Contains the claims (statements about the user and token). Includes registered claims (like
iss
: issuer,exp
: expiration time,sub
: subject/user ID), public claims, and private claims. Base64Url encoded. Payload data is readable by anyone, so don't put sensitive information here unless it's encrypted (JWE, less common than JWS). - Signature: Created by signing the encoded header and payload using a secret key (HMAC) or a private key (RSA/ECDSA). Ensures the token hasn't been tampered with.
Security Best Practices:
- Use a strong, secret key (for HMAC) or key pair (for RSA/ECDSA). Keep secrets secure (e.g., environment variables, secrets manager).
- Always validate the signature.
- Always validate the expiration time (
exp
claim). - Validate other relevant claims (
iss
- issuer,aud
- audience) if used. - Use HTTPS exclusively to prevent tokens from being intercepted.
- Keep token lifetimes reasonably short and implement refresh token strategies if needed for longer user sessions.
- Implement token revocation mechanisms if necessary (more complex for stateless JWTs, might require a blacklist).
Workshop Securing the Task Management API with JWT
Project: Add user authentication to the Task Management API. Users must log in (/token
) to get a JWT. Task endpoints will require this JWT for access, and tasks will be associated with the logged-in user.
Goal: Implement OAuth2 Password Flow with JWT Bearer tokens, protect task endpoints, and associate tasks with users.
Steps:
-
Ensure Setup: Be in the
task_manager_api
root directory with the structured application (app/main.py
, etc.) and virtual environment activated. -
Install Dependencies:
python-jose
: For JWT creation and validation.[cryptography]
includes necessary crypto backends.passlib
: For password hashing.[bcrypt]
includes the recommended bcrypt algorithm.- Add these to
requirements.txt
.
-
Define User and Token Schemas (
app/schemas/users.py
,app/schemas/tokens.py
):-
Create
app/schemas/users.py
:# In app/schemas/users.py from pydantic import BaseModel, EmailStr, Field from typing import Optional class UserBase(BaseModel): username: str = Field(..., min_length=3, max_length=50) email: Optional[EmailStr] = None # Use Pydantic's EmailStr for validation full_name: Optional[str] = None disabled: Optional[bool] = False class UserCreate(UserBase): password: str = Field(..., min_length=8) # Password required on creation class UserUpdate(UserBase): username: Optional[str] = Field(None, min_length=3, max_length=50) password: Optional[str] = Field(None, min_length=8) # Allow password update # User in DB will include hashed_password, not plain password class UserInDB(UserBase): id: int hashed_password: str class Config: orm_mode = True # For later DB integration # User model returned by API (never include password hash) class User(UserBase): id: int class Config: orm_mode = True
-
Create
app/schemas/tokens.py
:# In app/schemas/tokens.py from pydantic import BaseModel from typing import Optional class Token(BaseModel): access_token: str token_type: str # Typically "bearer" class TokenData(BaseModel): # Data embedded within the JWT payload (e.g., username or user ID) username: Optional[str] = None # You might add other fields like scopes or roles here
-
Create
app/schemas/__init__.py
if it doesn't exist. Touchapp/schemas/users.py
andapp/schemas/tokens.py
.
-
-
Create Security Utilities (
app/security.py
): This file will contain password hashing, JWT creation/decoding logic, and potentially the dependency function to get the current user.Self-correction: Added timezone handling for JWT expiration (# In app/security.py from datetime import datetime, timedelta, timezone from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from passlib.context import CryptContext # Import user schemas and token schemas (adjust path if needed) from .schemas.users import User, UserInDB # Assuming User model for return, UserInDB for lookup from .schemas.tokens import TokenData # --- Configuration --- # Use environment variables in a real app! SECRET_KEY = "your-super-secret-key" # Keep this secret and complex! ALGORITHM = "HS256" # HMAC SHA-256 ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Token validity period # --- Password Hashing --- # Configure passlib context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password: str, hashed_password: str) -> bool: """Verifies a plain password against a hashed password.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hashes a plain password.""" return pwd_context.hash(password) # --- JWT Handling --- def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Creates a JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: # Default expiration time expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) # Ensure 'sub' (subject) is present, typically the username or user ID if "sub" not in to_encode: raise ValueError("Subject ('sub') claim missing in token data") encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def decode_access_token(token: str) -> Optional[TokenData]: """Decodes and validates a JWT access token.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: Optional[str] = payload.get("sub") # 'sub' claim usually holds username/ID if username is None: raise JWTError("Subject claim ('sub') missing in token") token_data = TokenData(username=username) # Check expiration manually if needed (python-jose might do it) if "exp" in payload: expiration_time = datetime.fromtimestamp(payload["exp"], tz=timezone.utc) if expiration_time < datetime.now(timezone.utc): raise JWTError("Token has expired") # Add more validation here if necessary (e.g., audience, issuer) return token_data except JWTError as e: print(f"JWT Error: {e}") # Log the error return None # Indicate validation failure # --- OAuth2 Scheme Definition --- # Tells FastAPI where to find the token (Authorization: Bearer <token>) # tokenUrl should point to your token endpoint (relative path) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") # --- Dependency to Get Current User --- # Placeholder function to get user from 'database' (replace with actual DB logic) # We'll use a simple dictionary for now fake_users_db = { "testuser": { "username": "testuser", "full_name": "Test User", "email": "test@example.com", "hashed_password": get_password_hash("password123"), # Store hashed password "disabled": False, "id": 1, } } def get_user_from_db(username: str) -> Optional[UserInDB]: """Simulates fetching user data from DB.""" if username in fake_users_db: user_dict = fake_users_db[username] return UserInDB(**user_dict) # Convert dict to Pydantic model return None async def get_current_user(token: str = Depends(oauth2_scheme)) -> User: """ Dependency function to get the current user from the JWT token. 1. Decodes the token. 2. Validates token data. 3. Retrieves user from DB based on token subject (username). 4. Checks if the user is active. Returns the User Pydantic model (excluding password hash). Raises HTTPException 401 if authentication fails. """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, # Standard header for 401 ) token_data = decode_access_token(token) if token_data is None or token_data.username is None: raise credentials_exception # Fetch user from DB based on username stored in token's 'sub' claim user_in_db = get_user_from_db(username=token_data.username) if user_in_db is None: raise credentials_exception # Convert DB model (UserInDB) to API model (User) user = User.from_orm(user_in_db) return user async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User: """ Dependency function building on get_current_user. Checks if the user is active (not disabled). Use this dependency in endpoints requiring active, authenticated users. """ if current_user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user") return current_user
datetime.now(timezone.utc)
). Ensuredsub
claim is mandatory for token creation. Addedget_user_from_db
simulation. Changedget_current_user
to return theUser
model (safe for API response) instead ofUserInDB
. Createdget_current_active_user
for convenience. Use environment variables forSECRET_KEY
in production! -
Create Authentication Router (
app/routers/auth.py
): This router will handle the/token
endpoint.Self-correction: Correctly uses# In app/routers/auth.py from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from datetime import timedelta # Import security functions and schemas from ..security import ( verify_password, create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES, get_user_from_db, # To authenticate user ) from ..schemas.tokens import Token from ..schemas.users import User # To potentially return user info router = APIRouter(tags=["Authentication"]) @router.post("/token", response_model=Token) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends() ): """ OAuth2 Password Flow: Authenticates user and returns JWT access token. Expects 'username' and 'password' in form data (not JSON). """ # 1. Find user in DB by username provided in form_data user = get_user_from_db(username=form_data.username) # 2. Check if user exists and password is correct if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 3. Check if user is active (optional, but good practice) if user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user") # 4. Create the access token access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # The 'sub' claim should uniquely identify the user (username is common) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) # 5. Return the token return {"access_token": access_token, "token_type": "bearer"}
OAuth2PasswordRequestForm
which expects form data (application/x-www-form-urlencoded
) not JSON. Added check for disabled users. -
Include Auth Router in Main App (
app/main.py
):Self-correction: Added the# In app/main.py from fastapi import FastAPI # Import routers from .routers import tasks, auth # Add auth router app = FastAPI( title="Secure Task Management API", description="API with JWT authentication.", version="0.4.0", ) # Include routers app.include_router(auth.router) # Add the auth router app.include_router(tasks.router) # Keep the tasks router @app.get("/") async def read_root(): return {"message": "Welcome to the Secure Task Management API!"} # Optional: Add an endpoint to test authentication from .schemas.users import User from .security import get_current_active_user from fastapi import Depends @app.get("/users/me", response_model=User, tags=["Users"]) async def read_users_me(current_user: User = Depends(get_current_active_user)): """ Fetch the details of the currently authenticated user. """ return current_user
/users/me
endpoint as a useful way to test if authentication is working and to retrieve the current user's info. -
Protect Task Endpoints (
app/routers/tasks.py
): Modify the task router to require authentication using theget_current_active_user
dependency. Associate tasks with the user ID.Self-correction: Replaced the placeholder# In app/routers/tasks.py # ... (keep existing imports) from ..schemas.users import User # Import User schema from ..security import get_current_active_user # Import the dependency # Keep router definition... router = APIRouter( prefix="/tasks", tags=["Tasks"], responses={404: {"description": "Task not found"}}, # Add the dependency to ALL routes in this router if all require auth # Or add individually to each path operation function # dependencies=[Depends(get_current_active_user)] # Option 1: Router level dependency ) # --- Modify Storage --- # Replace fake DB session dependency with something that can store user-specific tasks # For now, let's modify the fake DB structure slightly # NOTE: This in-memory storage IS NOT SUITABLE FOR PRODUCTION OR REAL USE # It mixes user data and tasks in a way a real DB wouldn't. # This is purely for demonstration within the workshop constraints. # We will replace this entirely when we add a real database. # Fake DB Session (dependency.py) Modification - Illustrative # You would modify get_fake_db_session in app/dependencies.py # def get_fake_db_session(): # # ... yield {"users": fake_users_db, "tasks": {}} ... # We'll manage tasks directly in the router for simplicity now, but associate with user ID user_tasks_db = {} # {user_id: {task_id: Task}} next_task_id_global = 1 # --- Modify Path Operations --- # Add current_user dependency and use user.id @router.post("/", response_model=Task, status_code=status.HTTP_201_CREATED) async def create_task_route( task_in: TaskCreate, # Option 2: Add dependency per-route (more explicit) current_user: User = Depends(get_current_active_user) ): """ Create a new task for the currently authenticated user. """ global next_task_id_global print(f"DEBUG: User {current_user.username} (ID: {current_user.id}) creating task: {task_in.dict()}") new_task_id = next_task_id_global # Associate task with user - add owner_id or similar to Task model? # For now, store in user-specific dictionary structure new_task = Task(**task_in.dict(), id=new_task_id, completed=False) # Add owner_id=current_user.id if model supports it if current_user.id not in user_tasks_db: user_tasks_db[current_user.id] = {} user_tasks_db[current_user.id][new_task_id] = new_task next_task_id_global += 1 print(f"DEBUG: Stored task {new_task_id} for user {current_user.id}") return new_task # Return the task (might need modification if owner_id added) @router.get("/", response_model=List[Task]) async def read_tasks_route( skip: int = 0, limit: int = 10, current_user: User = Depends(get_current_active_user) ): """ Retrieve tasks for the currently authenticated user. """ print(f"DEBUG: User {current_user.username} (ID: {current_user.id}) fetching tasks") user_specific_tasks = user_tasks_db.get(current_user.id, {}) tasks_list = list(user_specific_tasks.values()) return tasks_list[skip : skip + limit] @router.get("/{task_id}", response_model=Task) async def read_task_route( task_id: int, current_user: User = Depends(get_current_active_user) # Remove db dependency if using global user_tasks_db for now ): """ Retrieve a specific task by ID, ensuring it belongs to the current user. (Async external call example removed for simplicity here, focus on auth) """ print(f"DEBUG: User {current_user.username} (ID: {current_user.id}) fetching task {task_id}") user_specific_tasks = user_tasks_db.get(current_user.id, {}) task = user_specific_tasks.get(task_id) if task is None: # Check if task exists at all for any user (optional, maybe just return 404) # task_exists_globally = any(task_id in tasks for tasks in user_tasks_db.values()) # if task_exists_globally: # raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to access this task") # else: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with ID {task_id} not found") return task @router.put("/{task_id}", response_model=Task) async def update_task_route( task_id: int, task_in: TaskUpdate, current_user: User = Depends(get_current_active_user) ): """ Update a task belonging to the current user. """ print(f"DEBUG: User {current_user.username} (ID: {current_user.id}) updating task {task_id}") user_specific_tasks = user_tasks_db.get(current_user.id, {}) task_to_update = user_specific_tasks.get(task_id) if task_to_update is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Task not found for update") update_data = task_in.dict(exclude_unset=True) updated_task_data = task_to_update.dict() updated_task_data.update(update_data) updated_task = Task(**updated_task_data) # Recreate task model instance user_tasks_db[current_user.id][task_id] = updated_task # Store updated task print(f"DEBUG: Updated task {task_id} for user {current_user.id}") return updated_task @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_task_route( task_id: int, current_user: User = Depends(get_current_active_user) ): """ Delete a task belonging to the current user. """ print(f"DEBUG: User {current_user.username} (ID: {current_user.id}) deleting task {task_id}") user_specific_tasks = user_tasks_db.get(current_user.id, {}) if task_id not in user_specific_tasks: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Task not found for deletion") del user_tasks_db[current_user.id][task_id] print(f"DEBUG: Deleted task {task_id} for user {current_user.id}") return None
db
dependency with direct use ofget_current_active_user
. Introduced a temporary global dictionaryuser_tasks_db
to simulate user-specific storage (this is bad practice but avoids premature database setup). Modified all task endpoints to check ownership and usecurrent_user.id
. Added print statements showing which user is performing actions. Added authorization logic: users can only access/modify their own tasks. Removed the fake DB dependency injection from task routes as it's superseded by the user-specific logic for now. A better approach would be to addowner_id
to theTask
Pydantic model and the future database model. -
Run and Test:
- Run the server:
uvicorn app.main:app --reload --port 8000
- Open Swagger UI:
http://127.0.0.1:8000/docs
- Authorize:
- Notice the "Authorize" button at the top right. Click it.
- A popup appears for "Available authorizations". Under "Password flow (oauth2-password)", enter the username (
testuser
) and password (password123
) defined infake_users_db
. - Click "Authorize" and then "Close". Swagger UI will now automatically include the obtained Bearer token in the headers for subsequent requests.
- Test
/users/me
: Execute theGET /users/me
endpoint. It should return the details fortestuser
. - Test Task Endpoints (Authenticated):
- Try
POST /tasks
to create a task. It should succeed (201 Created). - Try
GET /tasks
to list tasks. You should see the task you created. - Try
GET /tasks/{task_id}
for the created task ID. It should succeed. - Try
PUT /tasks/{task_id}
to update the task. It should succeed. - Try
DELETE /tasks/{task_id}
to delete the task. It should succeed (204 No Content).
- Try
- Test Task Endpoints (Unauthenticated):
- Click "Authorize" again and click "Logout" to clear the token.
- Try accessing any of the
/tasks
endpoints again. You should now receive a401 Unauthorized
error with detail "Not authenticated" (or similar, depending on howOAuth2PasswordBearer
handles missing tokens vs. invalid ones).
- Test with
curl
:- Get Token:
- Use Token (replace
<COPIED_TOKEN>
):TOKEN="<COPIED_TOKEN>" # Get current user curl -X GET "http://127.0.0.1:8000/users/me" -H "Authorization: Bearer $TOKEN" | jq # Create a task curl -X POST "http://127.0.0.1:8000/tasks/" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"title": "Curl Task", "description": "Created via curl with auth"}' \ | jq # List tasks curl -X GET "http://127.0.0.1:8000/tasks/" -H "Authorization: Bearer $TOKEN" | jq
- Run the server:
You have now successfully secured your API using OAuth2 Password Flow and JWT Bearer tokens. Users must authenticate to access task endpoints, and task operations are restricted to the authenticated user's own data. This provides a solid foundation for building secure, multi-user APIs.
6. Databases and ORMs
So far, our Task Management API has stored data in memory (first a global list/dict, then a slightly more structured global dict per user). This approach has major drawbacks:
- Data Loss: All data is lost when the server restarts.
- Scalability: Cannot handle large amounts of data efficiently. Not suitable for multiple server processes/instances.
- Concurrency Issues: Modifying global dictionaries in an async application without proper locking can lead to race conditions (though less likely with FastAPI's handling of sync code in threadpools and careful async code).
Real-world applications require persistent storage, typically using a database. We'll focus on integrating SQL databases (like PostgreSQL or SQLite) using SQLAlchemy, a powerful and widely used Object-Relational Mapper (ORM) for Python.
Why Use a Database?
- Persistence: Data survives server restarts.
- Scalability: Databases are designed to handle large datasets and high query loads.
- Data Integrity: Databases enforce constraints (e.g., unique keys, foreign keys, data types).
- Concurrency Control: Databases manage concurrent access and modifications safely.
- Querying Power: SQL provides a standard, powerful language for retrieving and manipulating data.
SQL vs NoSQL
- SQL (Relational Databases): Examples: PostgreSQL, MySQL, SQLite, SQL Server, Oracle.
- Store data in tables with predefined schemas (columns with specific data types).
- Relationships between tables are defined using keys (primary, foreign).
- Use SQL (Structured Query Language) for interaction.
- Strong consistency (ACID properties: Atomicity, Consistency, Isolation, Durability).
- Best suited for structured data where relationships and data integrity are critical.
- NoSQL (Non-Relational Databases): Examples: MongoDB (Document), Cassandra (Column-family), Redis (Key-value), Neo4j (Graph).
- More flexible data models (documents, key-value pairs, graphs, etc.).
- Schemas can be dynamic or non-existent.
- Often prioritize availability and partition tolerance over strong consistency (BASE properties).
- Can scale horizontally more easily for certain workloads.
- Best suited for unstructured or semi-structured data, large volumes, high throughput requirements, or specific data model needs (like graphs).
FastAPI works well with both types. We'll focus on SQL databases with SQLAlchemy due to their prevalence and the robust features SQLAlchemy offers.
SQLAlchemy Overview
SQLAlchemy provides two main components:
- Core (SQL Expression Language): A Pythonic way to generate SQL statements. You work with table metadata and Python expressions that map directly to SQL constructs. Gives fine-grained control over the generated SQL.
- ORM (Object-Relational Mapper): Allows you to map Python classes (your "models") to database tables. You interact with database rows as instances of your Python classes, and the ORM handles translating these operations into SQL queries. This provides a higher level of abstraction.
We will primarily use the SQLAlchemy ORM as it often leads to more idiomatic Python code when combined with frameworks like FastAPI and Pydantic.
Key SQLAlchemy ORM Concepts:
- Engine: Represents the connection source to the database (manages connection pool). Created once per application using
create_engine
. - Session: The primary interface for interacting with the database via the ORM. Represents a "transaction" or a unit of work. You create session objects, perform operations (add, query, delete objects), and then commit or rollback the changes.
sessionmaker
is used to create a factory for sessions. - Declarative Base: A base class from which your ORM models inherit. It maps the Python class to a database table.
- ORM Models: Python classes inheriting from the declarative base, representing database tables. Class attributes (using
Column
) define the table columns. - Relationships: Define relationships between models (one-to-one, one-to-many, many-to-many) using
relationship()
.
Setting Up Database Connection
-
Install SQLAlchemy and Driver:
- SQLAlchemy:
pip install sqlalchemy
- Database Driver (choose one based on your database):
- PostgreSQL:
pip install psycopg2-binary
(easier install) orpip install psycopg
(newer, supports async) - MySQL:
pip install mysqlclient
orpip install pymysql
- SQLite: Included with Python (
sqlite3
), no separate install needed.
- PostgreSQL:
- SQLAlchemy:
-
Database URL: SQLAlchemy connects using a database URL string, typically formatted as:
dialect+driver://username:password@host:port/database
- PostgreSQL:
postgresql+psycopg2://user:password@host:port/dbname
- MySQL:
mysql+pymysql://user:password@host:port/dbname
- SQLite:
sqlite:///./path/to/database.db
(relative path) orsqlite:////absolute/path/to/database.db
(absolute path) orsqlite:///:memory:
(in-memory database, non-persistent).
- PostgreSQL:
-
Create Engine and SessionLocal: Create a central file (e.g.,
app/db/session.py
) to manage the engine and session factory.Self-correction: Moved# In app/db/session.py from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base # Replace with your actual database URL (use environment variables!) # For the workshop, we'll use SQLite for simplicity. SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" # Example for PostgreSQL: # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db" # Create the SQLAlchemy engine # connect_args is needed only for SQLite to allow multi-threaded access engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} # Only needed for SQLite ) # Create a SessionLocal class - each instance will be a database session SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Create a Base class for our ORM models to inherit from Base = declarative_base() # Dependency to get DB session (moved here from dependencies.py) def get_db(): db = SessionLocal() try: yield db # Provide the session to the path operation function finally: db.close() # Ensure the session is closed after the request
get_db
dependency here as it's directly related to session management. Ensuredconnect_args
for SQLite.autoflush=False
is generally recommended with FastAPI/async frameworks.
Defining SQLAlchemy Models
Create ORM models corresponding to your data structures (e.g., Users, Tasks) in a file like app/models/task.py
or app/db/models.py
.
# In app/models/task.py (or similar)
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Date
from sqlalchemy.orm import relationship
# Import the Base from your session setup
from ..db.session import Base
class User(Base): # Assuming a User model is also needed
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True, nullable=False)
email = Column(String, unique=True, index=True, nullable=True)
full_name = Column(String, nullable=True)
hashed_password = Column(String, nullable=False)
disabled = Column(Boolean, default=False)
# Relationship: A user can have many tasks
tasks = relationship("Task", back_populates="owner")
class Task(Base):
__tablename__ = "tasks" # The name of the table in the database
# Define columns
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True, nullable=False)
description = Column(String, nullable=True)
completed = Column(Boolean, default=False, nullable=False)
due_date = Column(Date, nullable=True) # Use Date type for dates
# Foreign Key relationship to the 'users' table
owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Relationship: A task belongs to one owner (User)
# 'back_populates' links this relationship to the 'tasks' relationship in the User model
owner = relationship("User", back_populates="tasks")
User
model as tasks need an owner. Defined relationships using relationship
and back_populates
. Added nullable=False
where appropriate. Used Date
type for due_date
.
Performing CRUD Operations
CRUD (Create, Read, Update, Delete) operations are performed using the SQLAlchemy Session
object. You typically encapsulate this logic in dedicated CRUD functions (e.g., in app/crud/crud_task.py
).
# In app/crud/crud_task.py
from sqlalchemy.orm import Session
from typing import List, Optional
# Import models and schemas
from ..models.task import Task as TaskModel # SQLAlchemy model aliased
from ..models.user import User as UserModel # SQLAlchemy model aliased
from ..schemas.tasks import TaskCreate, TaskUpdate # Pydantic schemas
def get_task(db: Session, task_id: int, owner_id: int) -> Optional[TaskModel]:
"""Get a single task by ID, ensuring ownership."""
return db.query(TaskModel).filter(TaskModel.id == task_id, TaskModel.owner_id == owner_id).first()
def get_tasks(db: Session, owner_id: int, skip: int = 0, limit: int = 100) -> List[TaskModel]:
"""Get a list of tasks for a specific owner with pagination."""
return db.query(TaskModel).filter(TaskModel.owner_id == owner_id).offset(skip).limit(limit).all()
def create_task(db: Session, task: TaskCreate, owner_id: int) -> TaskModel:
"""Create a new task in the database."""
# Create a SQLAlchemy model instance from the Pydantic schema data
db_task = TaskModel(
**task.dict(), # Unpack Pydantic data
owner_id=owner_id, # Set the owner ID
completed=False # Ensure default completed status if not in TaskCreate
)
db.add(db_task) # Add the new object to the session
db.commit() # Commit the transaction to save to DB
db.refresh(db_task) # Refresh the object to get DB-generated values (like ID)
return db_task
def update_task(db: Session, task_id: int, task_update: TaskUpdate, owner_id: int) -> Optional[TaskModel]:
"""Update an existing task."""
db_task = get_task(db=db, task_id=task_id, owner_id=owner_id) # Fetch task, ensuring ownership
if not db_task:
return None # Task not found or doesn't belong to user
# Get update data, excluding unset fields from Pydantic model
update_data = task_update.dict(exclude_unset=True)
# Update the SQLAlchemy model fields
for key, value in update_data.items():
setattr(db_task, key, value)
db.add(db_task) # Add the updated object back to the session (marks it as dirty)
db.commit() # Commit changes
db.refresh(db_task) # Refresh to get updated state if needed
return db_task
def delete_task(db: Session, task_id: int, owner_id: int) -> Optional[TaskModel]:
"""Delete a task."""
db_task = get_task(db=db, task_id=task_id, owner_id=owner_id) # Fetch task, ensuring ownership
if not db_task:
return None # Task not found or doesn't belong to user
db.delete(db_task) # Mark the object for deletion
db.commit() # Commit the deletion
return db_task # Return the deleted object (optional)
owner_id
checks to all CRUD functions to enforce authorization at the database interaction level. Used setattr
for flexible updates based on TaskUpdate
schema. Ensured commit()
and refresh()
are called appropriately.
Integrating with FastAPI using Dependencies
Now, update the API router (app/routers/tasks.py
) to use these CRUD functions and the real database session dependency (get_db
).
-
Create Database Tables: Before running the app for the first time, you need to create the tables defined in your models. Add this to your main application file (
app/main.py
) or a separate script.# In app/main.py (add these imports and lines) from .db import session, models # Import Base and engine # Create database tables if they don't exist # This is simple for development; use Alembic for production migrations models.Base.metadata.create_all(bind=session.engine) app = FastAPI(...) # Keep your app definition # ... include routers ...
-
Update Task Router (
app/routers/tasks.py
):Self-correction: Ensured both# In app/routers/tasks.py from fastapi import APIRouter, HTTPException, Depends, status from typing import List from sqlalchemy.orm import Session # Import Session # Import schemas, models (optional here if CRUD returns models), crud functions, security from .. import schemas # Access via schemas.tasks.Task, etc. from .. import crud # Access via crud.task.get_task, etc. from ..db.session import get_db # Import the REAL DB session dependency from ..security import get_current_active_user # Keep auth dependency from ..models.task import Task as TaskModel # Import SQLAlchemy model if needed router = APIRouter( prefix="/tasks", tags=["Tasks"], responses={404: {"description": "Task not found"}}, ) # Inject BOTH the DB session and the current user @router.post("/", response_model=schemas.tasks.Task, status_code=status.HTTP_201_CREATED) async def create_task_route_db( # Renamed for clarity task_in: schemas.tasks.TaskCreate, db: Session = Depends(get_db), # Inject DB Session current_user: schemas.users.User = Depends(get_current_active_user) # Inject User ): """ Create a new task in the database for the current user. """ # Call the CRUD function to handle DB interaction db_task = crud.task.create_task(db=db, task=task_in, owner_id=current_user.id) # Pydantic's response_model handles conversion from SQLAlchemy model if orm_mode=True return db_task @router.get("/", response_model=List[schemas.tasks.Task]) async def read_tasks_route_db( skip: int = 0, limit: int = 10, db: Session = Depends(get_db), current_user: schemas.users.User = Depends(get_current_active_user) ): """ Retrieve tasks from the database for the current user. """ tasks = crud.task.get_tasks(db=db, owner_id=current_user.id, skip=skip, limit=limit) return tasks @router.get("/{task_id}", response_model=schemas.tasks.Task) async def read_task_route_db( task_id: int, db: Session = Depends(get_db), current_user: schemas.users.User = Depends(get_current_active_user) ): """ Retrieve a specific task by ID from the database, ensuring it belongs to the current user. """ db_task = crud.task.get_task(db=db, task_id=task_id, owner_id=current_user.id) if db_task is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") return db_task @router.put("/{task_id}", response_model=schemas.tasks.Task) async def update_task_route_db( task_id: int, task_in: schemas.tasks.TaskUpdate, db: Session = Depends(get_db), current_user: schemas.users.User = Depends(get_current_active_user) ): """ Update a task in the database belonging to the current user. """ updated_task = crud.task.update_task(db=db, task_id=task_id, task_update=task_in, owner_id=current_user.id) if updated_task is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found or not authorized to update") return updated_task @router.delete("/{task_id}", response_model=schemas.tasks.Task) # Return deleted task info async def delete_task_route_db( task_id: int, db: Session = Depends(get_db), current_user: schemas.users.User = Depends(get_current_active_user) ): """ Delete a task from the database belonging to the current user. Returns the deleted task data. Use status_code=204 if no body is desired. """ deleted_task = crud.task.delete_task(db=db, task_id=task_id, owner_id=current_user.id) if deleted_task is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found or not authorized to delete") # Return the data of the deleted task (optional, alternative is 204 No Content) return deleted_task # --- Need User CRUD --- # You would also need CRUD functions and potentially routes for user management # (e.g., user registration) similar to how tasks are handled. # For now, the /token endpoint uses the fake_users_db in security.py. # A full implementation would replace fake_users_db with crud.user functions.
db: Session
andcurrent_user
dependencies are injected. Renamed routes slightly (_db
) during refactoring (optional). Changed DELETE to return the deleted task model for confirmation (alternatively, usestatus_code=204
and returnNone
). Updated imports to use the structuredschemas
andcrud
modules. Made sure the response models (schemas.tasks.Task
) haveConfig.orm_mode = True
set inapp/schemas/tasks.py
so Pydantic can read data directly from the SQLAlchemy model attributes.
Workshop Integrating a SQL Database with the Task Management API
Project: Replace the in-memory task storage with a persistent SQLite database using SQLAlchemy.
Goal: Use SQLAlchemy ORM to define models, create tables, perform CRUD operations via a database session managed by FastAPI dependencies, and ensure data persists between server restarts.
Steps:
-
Ensure Setup: Be in the
task_manager_api
root directory. Have the structured app with JWT authentication. Virtual env active. -
Install Dependencies:
Addpip install sqlalchemy # If not already installed # No driver needed for SQLite # If using PostgreSQL: pip install psycopg2-binary (or psycopg[binary])
sqlalchemy
(and driver if needed) torequirements.txt
. -
Create Database Session Setup (
app/db/session.py
): Create the file as shown in the "Setting Up Database Connection" section above, using the SQLite URL:SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
. Make sure it includesengine
,SessionLocal
,Base
, and theget_db
dependency. Create theapp/db
directory andapp/db/__init__.py
. -
Define ORM Models (
app/models/task.py
,app/models/user.py
):- Create the
app/models
directory andapp/models/__init__.py
. - Create
app/models/user.py
with theUser
class definition shown in the "Defining SQLAlchemy Models" section. - Create
app/models/task.py
with theTask
class definition shown previously, ensuring it importsBase
fromapp.db.session
andUser
from.user
. EnsureForeignKey("users.id")
and relationships are set up correctly.
- Create the
-
Create CRUD Functions (
app/crud/crud_task.py
,app/crud/crud_user.py
):- Create the
app/crud
directory andapp/crud/__init__.py
. - Create
app/crud/crud_task.py
with theget_task
,get_tasks
,create_task
,update_task
,delete_task
functions as shown in the "Performing CRUD Operations" section. Ensure imports ofSession
, models (app.models.*
), and schemas (app.schemas.*
) are correct. -
Create
app/crud/crud_user.py
to handle user operations (needed to replacefake_users_db
).Self-correction: Added basic# In app/crud/crud_user.py from sqlalchemy.orm import Session from typing import Optional from ..models.user import User as UserModel from ..schemas.users import UserCreate from ..security import get_password_hash # Need password hashing function def get_user(db: Session, user_id: int) -> Optional[UserModel]: return db.query(UserModel).filter(UserModel.id == user_id).first() def get_user_by_email(db: Session, email: str) -> Optional[UserModel]: return db.query(UserModel).filter(UserModel.email == email).first() def get_user_by_username(db: Session, username: str) -> Optional[UserModel]: return db.query(UserModel).filter(UserModel.username == username).first() def create_user(db: Session, user: UserCreate) -> UserModel: hashed_password = get_password_hash(user.password) # Create SQLAlchemy model, excluding plain password db_user = UserModel( username=user.username, email=user.email, full_name=user.full_name, hashed_password=hashed_password, disabled=user.disabled if user.disabled is not None else False ) db.add(db_user) db.commit() db.refresh(db_user) return db_user # Add update_user, delete_user functions as needed
get_user_by_username
andcreate_user
CRUD functions. User creation now hashes the password.
- Create the
-
Update Security Utilities (
app/security.py
): Modifyget_user_from_db
to use the newcrud.user.get_user_by_username
function. Inject the DB session into it. Also update the/token
endpoint dependency.Self-correction: Injected# In app/security.py # ... (keep imports: Depends, HTTPException, status, OAuth2PasswordBearer, ...) from sqlalchemy.orm import Session # Import Session from .db.session import get_db # Import DB dependency from . import crud # Import crud module # ... (keep pwd_context, verify_password, get_password_hash, ...) # ... (keep JWT functions: create_access_token, decode_access_token) # ... (keep oauth2_scheme) # --- Dependency to Get Current User (DATABASE VERSION) --- # Remove fake_users_db # Modify get_user_from_db to use CRUD and DB session def get_user_from_db(db: Session, username: str) -> Optional[crud.UserModel]: # Return SQLAlchemy Model """Fetches user data from DB using CRUD function.""" return crud.user.get_user_by_username(db=db, username=username) async def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) # <<< Inject DB session here too ) -> schemas.users.User: # Return Pydantic schema # ... (keep credentials_exception definition) ... token_data = decode_access_token(token) if token_data is None or token_data.username is None: raise credentials_exception # Fetch user using CRUD function and the injected DB session user_model = get_user_from_db(db=db, username=token_data.username) # Get SQLAlchemy model if user_model is None: raise credentials_exception # Convert SQLAlchemy model to Pydantic schema User for return # Requires orm_mode=True in schemas.users.User.Config user_schema = schemas.users.User.from_orm(user_model) return user_schema # Modify get_current_active_user to correctly pass the user schema async def get_current_active_user( current_user: schemas.users.User = Depends(get_current_user) # Depends on the updated get_current_user ) -> schemas.users.User: if current_user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user") return current_user
db: Session = Depends(get_db)
intoget_current_user
. Modifiedget_user_from_db
to call the CRUD function. Updated return type hints and ensured conversion from ORM model to Pydantic schema usingfrom_orm
. -
Update Auth Router (
app/routers/auth.py
): Inject the database session into the/token
endpoint and use the CRUD function to find the user.Self-correction: Injected# In app/routers/auth.py # ... (keep imports: APIRouter, Depends, HTTPException, status, ...) from sqlalchemy.orm import Session # Import Session from ..db.session import get_db # Import DB dependency from ..security import verify_password, create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES from .. import crud # Import CRUD module from .. import schemas # Import Schemas module router = APIRouter(tags=["Authentication"]) @router.post("/token", response_model=schemas.tokens.Token) async def login_for_access_token_db( # Renamed form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) # <<< Inject DB Session ): # Find user in DB using CRUD function user = crud.user.get_user_by_username(db=db, username=form_data.username) # Use CRUD # Check if user exists and password is correct (using user.hashed_password from DB model) if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException(...) # Keep exception logic if user.disabled: raise HTTPException(...) # Keep exception logic # Create token (using user.username from DB model) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} # --- Add User Registration Endpoint --- @router.post("/register", response_model=schemas.users.User, status_code=status.HTTP_201_CREATED) async def register_user( user_in: schemas.users.UserCreate, db: Session = Depends(get_db) ): """ Register a new user. """ # Check if user already exists db_user_by_username = crud.user.get_user_by_username(db, username=user_in.username) if db_user_by_username: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered") if user_in.email: db_user_by_email = crud.user.get_user_by_email(db, email=user_in.email) if db_user_by_email: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered") # Create user using CRUD function new_user = crud.user.create_user(db=db, user=user_in) # Return the created user data (Pydantic model handles ORM conversion) return new_user
db: Session
into/token
. Usedcrud.user.get_user_by_username
. Added a basic/register
endpoint usingcrud.user.create_user
for testing. -
Update Task Router (
app/routers/tasks.py
): Ensure it uses the correct imports for schemas, crud functions,get_db
, andget_current_active_user
as shown in the "Integrating with FastAPI using Dependencies" -> Step 2 section above. Remove the temporary globaluser_tasks_db
andnext_task_id_global
. Ensure theresponse_model
uses the Pydantic schema (e.g.,schemas.tasks.Task
) and that the schema hasConfig.orm_mode = True
. -
Update Main App (
app/main.py
): Ensure thecreate_all
call is present to initialize the database tables.Self-correction: Adjusted import path for# In app/main.py from fastapi import FastAPI from .routers import tasks, auth from .db import session, models # Correct import path # Create DB tables (only if they don't exist) # Use Alembic for migrations in production! try: models.Base.metadata.create_all(bind=session.engine) print("Database tables created successfully (if they didn't exist).") except Exception as e: print(f"Error creating database tables: {e}") app = FastAPI( title="Task Management API with DB", description="API with JWT authentication and persistent SQLite database.", version="0.5.0", ) app.include_router(auth.router) app.include_router(tasks.router) @app.get("/") async def read_root(): return {"message": "Welcome to the Task Management API with Database!"}
session, models
. Added a simple print/try-except aroundcreate_all
. -
Run and Test:
- Stop the currently running server if any.
- Run:
uvicorn app.main:app --reload --port 8000
- Observe the console output. You should see the message about tables being created (the first time). A file named
sql_app.db
should appear in your project root directory. - Open Swagger UI:
http://127.0.0.1:8000/docs
- Register: Use the
POST /register
endpoint to create a new user (e.g., usernamedbuser
, passwordpassword123
). - Login: Use the
POST /token
endpoint with the credentials you just registered to get a JWT. - Authorize: Use the "Authorize" button in Swagger UI, paste the token into the Bearer field.
- Test Tasks:
POST /tasks
to create tasks for the logged-in user.GET /tasks
to retrieve them.GET /tasks/{task_id}
,PUT /tasks/{task_id}
,DELETE /tasks/{task_id}
.
- Verify Persistence: Stop the Uvicorn server (
Ctrl+C
) and restart it (uvicorn app.main:app --reload --port 8000
). Login again (get a new token) and authorize. UseGET /tasks
. Your previously created tasks should still be there, loaded from thesql_app.db
file.
You have successfully integrated a persistent SQL database (SQLite) into your FastAPI application using SQLAlchemy ORM. Your API now stores data reliably, uses proper session management via dependencies, and separates database logic into CRUD functions.
7. Testing FastAPI Applications
Writing automated tests is crucial for building reliable and maintainable APIs. Tests verify that your code works as expected, prevent regressions when you make changes, and serve as documentation for your API's behavior. FastAPI is designed with testability in mind and integrates seamlessly with standard Python testing frameworks like pytest
.
Importance of Testing APIs
- Ensure Correctness: Verify that endpoints return the expected responses and status codes for valid inputs.
- Validate Error Handling: Check that appropriate error responses (e.g., 404 Not Found, 422 Unprocessable Entity, 401 Unauthorized) are returned for invalid inputs or unauthorized requests.
- Prevent Regressions: Run tests automatically (e.g., in CI/CD pipelines) to catch bugs introduced by new code changes.
- Improve Design: Writing tests often encourages better code structure (e.g., separating concerns, using dependency injection) which makes testing easier.
- Documentation: Tests demonstrate how the API is intended to be used.
Using pytest
pytest
is a popular, feature-rich testing framework for Python. It makes writing simple tests easy and scales well for complex scenarios. Key features include:
- Simple test discovery (functions named
test_*
or classes namedTest*
). - Powerful fixture mechanism for setup/teardown.
- Detailed information on test failures.
- Extensible with plugins.
Install pytest
:
requirements-dev.txt
or similar.
FastAPI's TestClient
FastAPI provides a TestClient
class (based on httpx
) that allows you to make requests directly to your FastAPI application in your tests without needing to run a live Uvicorn server. It interacts with your application object in memory.
How to use TestClient
:
- Import
TestClient
fromfastapi.testing
. - Import your FastAPI application instance (
app
fromapp.main
). - Instantiate the client:
client = TestClient(app)
. - Make requests using methods like
client.get()
,client.post()
,client.put()
,client.delete()
. These methods mirror therequests
orhttpx
library API.client.get("/path")
client.post("/path", json={"key": "value"})
client.put("/path/{item_id}", json={...})
client.delete("/path/{item_id}")
- Headers can be passed using the
headers={...}
argument.
- The client methods return a
Response
object (similar tohttpx
's response). You can inspect:response.status_code
response.json()
(to get the JSON body)response.text
(to get the raw text body)response.headers
Writing Tests
Tests are typically placed in a tests/
directory at the root of your project. pytest
will automatically discover test files (e.g., test_main.py
, test_tasks.py
) and test functions/methods within them.
Example: Basic Test (tests/test_main.py
)
# In tests/test_main.py
from fastapi.testclient import TestClient
# Import your FastAPI app instance
# Adjust the import path based on your structure
# This assumes tests/ is at the same level as app/
from app.main import app
# Instantiate the TestClient
client = TestClient(app)
def test_read_root():
"""Test the root endpoint '/'."""
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Welcome to the Task Management API with Database!"}
def test_non_existent_route():
"""Test accessing a route that does not exist."""
response = client.get("/non-existent-path")
# FastAPI automatically returns 404 for undefined routes
assert response.status_code == 404
assert response.json() == {"detail": "Not Found"}
Testing Endpoints with Dependencies (Database, Auth)
Testing endpoints that rely on dependencies like database sessions or authentication requires additional setup, often using pytest
fixtures.
- Overriding Dependencies: FastAPI allows you to override dependencies during testing. This is extremely useful for replacing a real database connection with a test database or mocking authentication.
- Test Database: For database tests, you typically want:
- A separate test database (e.g., a different SQLite file or a separate PostgreSQL database/schema).
- A mechanism to create the required tables before tests run.
- A way to clean up or reset the database between tests to ensure test isolation.
- Testing Authentication:
- Test the
/token
endpoint itself to ensure it returns tokens correctly for valid credentials and errors for invalid ones. - For testing protected endpoints:
- You can authenticate within the test by calling
/token
first and then using the obtained token in subsequent requests via theheaders
argument. - Alternatively (and often simpler for unit/integration tests of the endpoint logic itself), you can override the authentication dependency (
get_current_active_user
) to return a known dummy user object directly, bypassing the token validation logic for that specific test run.
- You can authenticate within the test by calling
- Test the
Example: Setting up a Test Database Fixture (tests/conftest.py
)
pytest
automatically discovers fixtures defined in files named conftest.py
.
# In tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool # Useful for in-memory SQLite testing
# Import your Base, app, schemas, models etc. Adjust paths as needed.
from app.main import app
from app.db.session import Base, get_db # Import original get_db
from app.models.user import User as UserModel # Import DB model
from app.schemas.users import UserCreate # Import Pydantic schema
from app.security import get_password_hash # Import hashing utility
from app import crud # Import crud functions
# --- Test Database Setup ---
# Use an in-memory SQLite database for isolated testing
TEST_SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
# Or use a file: TEST_SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
TEST_SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}, # Needed for SQLite
poolclass=StaticPool, # Use StaticPool for in-memory DB with TestClient
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create tables before tests start, drop them after (using yield fixture)
@pytest.fixture(scope="session", autouse=True)
def create_test_tables():
"""Creates DB tables before tests run and drops them after."""
print("\nCreating test database tables...")
Base.metadata.create_all(bind=engine)
yield # Tests run here
print("\nDropping test database tables...")
Base.metadata.drop_all(bind=engine)
# If using a file DB, you might want os.remove('./test.db') here
# --- Dependency Override Fixture ---
@pytest.fixture(scope="function") # function scope ensures clean DB for each test
def override_get_db():
"""Fixture to override the get_db dependency with a test session."""
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
# --- Test Client Fixture ---
# This fixture applies the dependency override automatically
@pytest.fixture(scope="function")
def client(override_get_db):
"""Provides a TestClient instance with the DB dependency overridden."""
# Define the override function for get_db
def _override_get_db():
try:
yield override_get_db # Yield the test session from the other fixture
finally:
pass # Closing is handled by override_get_db fixture
# Apply the override to the FastAPI app instance
app.dependency_overrides[get_db] = _override_get_db
# Create the TestClient
with TestClient(app) as test_client:
yield test_client
# Clean up the override after the test (important!)
app.dependency_overrides.clear()
# --- Optional: Fixture for Authenticated Client ---
@pytest.fixture(scope="function")
def test_user(override_get_db):
"""Creates a test user in the DB for authentication tests."""
db = override_get_db
user_data = UserCreate(
username="testuser_fixture",
email="test_fixture@example.com",
password="password123"
)
user = crud.user.create_user(db=db, user=user_data)
return {"username": user.username, "password": "password123"} # Return credentials
@pytest.fixture(scope="function")
def authenticated_client(client, test_user):
"""Provides a TestClient instance that is already authenticated."""
# Get token for the test_user
response = client.post(
"/token",
data={"username": test_user["username"], "password": test_user["password"]}
# Use data for form encoding, not json
)
assert response.status_code == 200, f"Failed to get token: {response.text}"
token_data = response.json()
token = token_data["access_token"]
# Set the Authorization header for this client instance
client.headers = {
"Authorization": f"Bearer {token}"
}
return client # Return the client, now with auth headers set
sqlite:///:memory:
for faster, isolated tests. Used StaticPool
which is recommended for SQLite with TestClient
. Created separate fixtures for overriding the DB (override_get_db
) and providing the client (client
). The client
fixture now correctly applies and clears the dependency override. Added fixtures (test_user
, authenticated_client
) to simplify testing authenticated endpoints. Ensured /token
uses data
not json
. Used scope="function"
for client and DB override fixtures to ensure test isolation (each test gets a fresh DB session and client). Used scope="session"
for table creation/deletion as it only needs to happen once per test run.
Example: Testing Task Endpoints (tests/test_tasks.py
)
# In tests/test_tasks.py
import pytest # Import pytest if using markers like @pytest.mark...
# Fixtures 'client' and 'authenticated_client' are automatically available
# if defined in tests/conftest.py
def test_create_task(authenticated_client):
"""Test creating a task when authenticated."""
response = authenticated_client.post(
"/tasks/",
json={"title": "Test Task Title", "description": "Test Task Description"}
)
assert response.status_code == 201, response.text
data = response.json()
assert data["title"] == "Test Task Title"
assert data["description"] == "Test Task Description"
assert data["completed"] is False
assert "id" in data
# Check owner_id? Requires modification to schema or fetching user first
def test_create_task_unauthenticated(client):
"""Test creating a task without authentication."""
response = client.post(
"/tasks/",
json={"title": "Unauthorized Task"}
)
# Expect 401 Unauthorized because the 'client' fixture is unauthenticated
assert response.status_code == 401
assert "WWW-Authenticate" in response.headers # Check for standard header
def test_read_tasks(authenticated_client):
"""Test reading tasks when authenticated."""
# Create a task first to ensure there's data
authenticated_client.post("/tasks/", json={"title": "Task 1"})
authenticated_client.post("/tasks/", json={"title": "Task 2"})
response = authenticated_client.get("/tasks/")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
# Check if the created tasks are in the list (consider pagination/order)
assert len(data) >= 2 # Assuming clean DB per test due to fixture scope
assert any(task["title"] == "Task 1" for task in data)
assert any(task["title"] == "Task 2" for task in data)
def test_read_specific_task(authenticated_client):
"""Test reading a specific task that belongs to the user."""
# Create a task
create_response = authenticated_client.post("/tasks/", json={"title": "Specific Task"})
assert create_response.status_code == 201
task_id = create_response.json()["id"]
# Read it back
response = authenticated_client.get(f"/tasks/{task_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == task_id
assert data["title"] == "Specific Task"
def test_read_non_existent_task(authenticated_client):
"""Test reading a task ID that does not exist."""
response = authenticated_client.get("/tasks/99999")
assert response.status_code == 404
def test_update_task(authenticated_client):
"""Test updating a task."""
# Create a task
create_response = authenticated_client.post("/tasks/", json={"title": "Task to Update"})
task_id = create_response.json()["id"]
# Update it
update_response = authenticated_client.put(
f"/tasks/{task_id}",
json={"title": "Updated Task Title", "completed": True}
)
assert update_response.status_code == 200
data = update_response.json()
assert data["title"] == "Updated Task Title"
assert data["completed"] is True
assert data["id"] == task_id
# Verify the update by reading again
read_response = authenticated_client.get(f"/tasks/{task_id}")
assert read_response.status_code == 200
assert read_response.json()["title"] == "Updated Task Title"
def test_delete_task(authenticated_client):
"""Test deleting a task."""
# Create a task
create_response = authenticated_client.post("/tasks/", json={"title": "Task to Delete"})
task_id = create_response.json()["id"]
# Delete it
delete_response = authenticated_client.delete(f"/tasks/{task_id}")
# Assuming the endpoint returns the deleted item (status 200)
# If it returns 204 No Content, adjust the assert
assert delete_response.status_code == 200 # Or 204
# if delete_response.status_code == 200:
# assert delete_response.json()["id"] == task_id
# Verify deletion by trying to read it again
read_response = authenticated_client.get(f"/tasks/{task_id}")
assert read_response.status_code == 404 # Should be Not Found now
# Add tests for edge cases: invalid input (422), updating/deleting non-existent tasks (404),
# trying to access tasks belonging to other users (requires more complex setup or mocking).
Running Tests
Navigate to your project's root directory (the one containing app/
and tests/
) in your Linux terminal (with the virtual environment activated) and run pytest
:
pytest
# Or for more verbose output:
pytest -v
# To run tests only in a specific file:
pytest tests/test_tasks.py
# To run tests matching a specific name (using -k):
pytest -k create
pytest
will discover your tests/
directory, conftest.py
, and test files, execute the tests, apply fixtures, and report the results.
Workshop Writing Tests for the Task Management API
Project: Add comprehensive tests for the database-integrated, authenticated Task Management API using pytest
and TestClient
.
Goal: Write tests covering user registration, login, and all CRUD operations for tasks, including authentication checks and basic error handling validation. Utilize fixtures for test setup (database, authenticated client).
Steps:
-
Ensure Setup: Be in the
task_manager_api
root directory. Have the fully functional API with database integration and JWT auth. Virtual env active. -
Install
pytest
: -
Create
tests/
Directory and__init__.py
: -
Create
tests/conftest.py
: Copy the fixture setup code provided in the "Example: Setting up a Test Database Fixture" section above intotests/conftest.py
. Ensure all imports (app
,Base
,get_db
, models, schemas, etc.) correctly point to your application's structure (e.g.,from app.main import app
,from app.db.session import Base, get_db
). Double-check theTEST_SQLALCHEMY_DATABASE_URL
. -
Create
tests/test_auth.py
: Write tests specifically for the authentication endpoints (/register
,/token
).# In tests/test_auth.py # client fixture is automatically available from conftest.py def test_register_user(client): """Test successful user registration.""" response = client.post( "/register", json={ "username": "testregister", "email": "testregister@example.com", "password": "password123" } ) assert response.status_code == 201, response.text data = response.json() assert data["username"] == "testregister" assert data["email"] == "testregister@example.com" assert "id" in data assert "hashed_password" not in data # Ensure password hash isn't returned def test_register_user_duplicate_username(client): """Test registration with an existing username.""" # Register user once client.post("/register", json={"username": "duplicate", "password": "pw1"}) # Try registering again with the same username response = client.post( "/register", json={"username": "duplicate", "password": "pw2"} ) assert response.status_code == 400 assert "Username already registered" in response.json()["detail"] def test_register_user_duplicate_email(client): """Test registration with an existing email.""" client.post("/register", json={"username": "user1", "email": "duplicate@example.com", "password": "pw1"}) response = client.post( "/register", json={"username": "user2", "email": "duplicate@example.com", "password": "pw2"} ) assert response.status_code == 400 assert "Email already registered" in response.json()["detail"] def test_login_for_access_token(client): """Test successful login and token retrieval.""" # Register user first (using a unique username for this test) username = "testloginuser" password = "password123" client.post("/register", json={"username": username, "password": password}) # Attempt login response = client.post( "/token", data={"username": username, "password": password} # Use data for form encoding ) assert response.status_code == 200, response.text data = response.json() assert "access_token" in data assert data["token_type"] == "bearer" def test_login_incorrect_password(client): """Test login with incorrect password.""" username = "testlogin_wrongpw" password = "password123" client.post("/register", json={"username": username, "password": password}) response = client.post( "/token", data={"username": username, "password": "wrongpassword"} ) assert response.status_code == 401 assert "Incorrect username or password" in response.json()["detail"] assert "WWW-Authenticate" in response.headers def test_login_nonexistent_user(client): """Test login with a username that does not exist.""" response = client.post( "/token", data={"username": "nosuchuser", "password": "password"} ) assert response.status_code == 401 # Should fail authentication assert "Incorrect username or password" in response.json()["detail"] def test_read_users_me(authenticated_client, test_user): """Test the /users/me endpoint with an authenticated client.""" # test_user fixture provides credentials used by authenticated_client response = authenticated_client.get("/users/me") assert response.status_code == 200 data = response.json() assert data["username"] == test_user["username"] # Check against the fixture user assert data["email"] is not None # Assuming test_user fixture creates email assert "id" in data def test_read_users_me_unauthenticated(client): """Test accessing /users/me without authentication.""" response = client.get("/users/me") assert response.status_code == 401 # Expect authentication error
-
Create
tests/test_tasks.py
: Copy the task endpoint test code provided in the "Example: Testing Task Endpoints" section above intotests/test_tasks.py
. This code already utilizes theclient
andauthenticated_client
fixtures fromconftest.py
. -
Run Tests:
- Make sure you are in the
task_manager_api
root directory. - Activate your virtual environment (
source venv/bin/activate
). - Run pytest:
- Observe the output.
pytest
should discover and run all tests intest_auth.py
andtest_tasks.py
. You should see output indicating the creation and dropping of test database tables (from theconftest.py
fixture). All tests should pass if the code and fixtures are set up correctly. Fix any failures by debugging the test or the application code.
- Make sure you are in the
You have now implemented a robust test suite for your FastAPI application, covering authentication and core CRUD functionality. These tests ensure your API behaves correctly and provide a safety net against regressions as you continue development.
8. Deployment on Linux
Developing your FastAPI application locally is just the first step. To make it accessible to users or other services, you need to deploy it to a server, typically running Linux. Deployment involves packaging your application, running it using a production-grade server, managing the process, and potentially setting up a reverse proxy for security and performance.
Overview of Deployment Strategies
- Directly on VM/Server: Install Python, dependencies, a process manager (like
systemd
), a production ASGI server (like Uvicorn managed by Gunicorn), and potentially a reverse proxy (like Nginx) directly onto a Linux virtual machine or physical server. - Containerization (Docker): Package your application, its dependencies, and the runtime environment into a Docker container image. Run this container on any machine with Docker installed. Often orchestrated using
docker-compose
(for single-host) or Kubernetes/Docker Swarm (for multi-host clusters). This is the most common and recommended approach for modern applications due to its portability and consistency. - Platform-as-a-Service (PaaS): Use cloud platforms like Heroku, Google App Engine, AWS Elastic Beanstalk, or Azure App Service. These platforms abstract away much of the server management, allowing you to deploy your code (often via Git or uploading a package) more easily. They handle scaling, load balancing, etc., but offer less control than managing your own VM or containers.
- Serverless: Deploy individual API endpoints as functions (e.g., AWS Lambda, Google Cloud Functions, Azure Functions) fronted by an API Gateway. Suitable for specific use cases, can be cost-effective for low-traffic APIs, but requires a different architectural approach.
We will focus on Containerization with Docker combined with Nginx as a reverse proxy, a very common and powerful setup for deploying FastAPI applications on Linux.
WSGI/ASGI Servers for Production
While uvicorn main:app --reload
is great for development, it's not suitable for production. You need a robust ASGI server setup capable of handling multiple worker processes for concurrency and managed reliably.
- Uvicorn: A high-performance ASGI server. Can be run directly but often managed by Gunicorn for better process management.
- Gunicorn: A mature, widely-used WSGI server. While originally for WSGI, it can manage Uvicorn workers to run ASGI applications like FastAPI. Gunicorn handles worker management (spawning multiple processes, restarting failed ones), signal handling, and graceful shutdowns.
- Hypercorn: Another ASGI server, supports HTTP/2 and HTTP/3.
Recommended Setup: Gunicorn managing Uvicorn workers:
# Install Gunicorn
pip install gunicorn
# Run command (example)
gunicorn -w 4 -k uvicorn.workers.UvicornWorker app.main:app -b 0.0.0.0:8000
gunicorn
: The command to run Gunicorn.-w 4
: Number of worker processes. A common starting point is(2 * number_of_cpu_cores) + 1
. Adjust based on load testing. Each worker runs an independent Uvicorn event loop.-k uvicorn.workers.UvicornWorker
: Specifies the worker class to use. Tells Gunicorn to use Uvicorn to handle requests within each worker process.app.main:app
: Path to your FastAPI application instance (app
object insideapp/main.py
).-b 0.0.0.0:8000
: Bind address. Tells Gunicorn to listen on port 8000 on all available network interfaces (essential when running inside a container).
Containerization with Docker
Docker allows you to package your application and all its dependencies (Python runtime, libraries, system tools) into a standardized unit called a container image. This image can then be run consistently across different environments (developer machine, testing server, production server).
Key Docker Concepts:
- Dockerfile: A text file containing instructions to build a Docker image. Defines the base image, copies application code, installs dependencies, sets environment variables, exposes ports, and specifies the command to run when a container starts.
- Image: A read-only template containing the application and its environment. Built from a Dockerfile.
- Container: A runnable instance of an image. It's isolated from the host system and other containers.
- Docker Engine: The background service that builds, runs, and manages containers.
- Docker Hub / Registry: A place to store and share Docker images.
Creating a Dockerfile
for FastAPI:
# Dockerfile
# 1. Use an official Python runtime as a parent image
# Choose a specific version for reproducibility, slim variant for smaller size
FROM python:3.11-slim
# 2. Set environment variables
# Prevents Python from writing pyc files to disc (optional)
ENV PYTHONDONTWRITEBYTECODE 1
# Prevents Python from buffering stdout and stderr (important for logs)
ENV PYTHONUNBUFFERED 1
# 3. Set the working directory in the container
WORKDIR /app
# 4. Install system dependencies if needed (e.g., for psycopg2)
# RUN apt-get update && apt-get install -y --no-install-recommends some-package && rm -rf /var/lib/apt/lists/*
# 5. Copy requirements file and install Python dependencies
# Copy only requirements first to leverage Docker cache
COPY requirements.txt .
# Consider using a virtual environment inside the container too (optional but good practice)
# RUN python -m venv /venv && . /venv/bin/activate
RUN pip install --no-cache-dir -r requirements.txt
# If using venv: RUN /venv/bin/pip install --no-cache-dir -r requirements.txt
# 6. Copy the rest of the application code into the container
COPY ./app /app/app
# If you have other root files (like sql_app.db if you want to include it, though not ideal)
# COPY ./sql_app.db /app/sql_app.db
# 7. Expose the port the app runs on (informational)
# This doesn't actually publish the port, -p flag in 'docker run' does that.
EXPOSE 8000
# 8. Define the command to run the application
# Use the Gunicorn command for production
# Ensure app.main:app points to your FastAPI instance correctly
# Use CMD for default command, can be overridden
CMD ["gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "app.main:app", "--bind", "0.0.0.0:8000"]
# If using venv inside container:
# CMD ["/venv/bin/gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "app.main:app", "--bind", "0.0.0.0:8000"]
--no-cache-dir
for pip to reduce image size. Explicitly mentioned copying the app
directory. Showed example CMD
using Gunicorn.
Building and Running the Docker Image:
- Create
requirements.txt
: Make sure you have arequirements.txt
file listing all dependencies. - Build the Image: Run this command in the directory containing your
Dockerfile
and application code.-t your-api-image-name:latest
: Tags the image with a name and tag (e.g.,latest
)..
: Specifies the build context (the current directory).
- Run the Container:
-d
: Run the container in detached mode (in the background).--name task-api-container
: Assigns a name to the running container.-p 8080:8000
: Maps port 8080 on the host machine to port 8000 inside the container (where Gunicorn is listening). You can access the API viahttp://localhost:8080
orhttp://<your-linux-ip>:8080
.your-api-image-name:latest
: The image to run.
Using docker-compose
(Optional but Recommended)
docker-compose
is a tool for defining and running multi-container Docker applications. It uses a YAML file (docker-compose.yml
) to configure the application's services, networks, and volumes. This is useful if your application requires other services like a database or Redis.
# docker-compose.yml
version: '3.8' # Specify compose file version
services:
# API service (your FastAPI app)
api:
build: . # Build the image from the Dockerfile in the current directory
image: task-manager-api-compose:latest # Optional: tag the built image
container_name: task-manager-api-service
# If Dockerfile exposes 8000, map host port 80 to container port 8000
ports:
- "80:8000" # Map host 80 to container 8000
volumes:
# Mount the database file from host to container for persistence
# Adjust path if your DB file is elsewhere or named differently
# Note: For production, prefer a managed DB service or a separate DB container
- ./sql_app.db:/app/sql_app.db # Mount SQLite file
environment:
# Pass environment variables to the container (e.g., for secrets)
# - DATABASE_URL=postgresql://user:pass@db:5432/appdb
# - SECRET_KEY=your_production_secret_key
- APP_ENV=production
# depends_on: # Uncomment if you add a database service
# - db
restart: unless-stopped # Restart policy
# Example: Add a PostgreSQL database service (optional)
# db:
# image: postgres:15-alpine
# container_name: task-manager-db
# volumes:
# - postgres_data:/var/lib/postgresql/data/ # Persistent volume for DB data
# environment:
# - POSTGRES_USER=user
# - POSTGRES_PASSWORD=password
# - POSTGRES_DB=appdb
# ports:
# - "5432:5432" # Expose DB port only if needed externally (usually not)
# restart: unless-stopped
# Define persistent volumes (if using DB service)
# volumes:
# postgres_data:
To use docker-compose
:
- Install
docker-compose
(often included with Docker Desktop or installed separately on Linux:sudo apt install docker-compose
orsudo dnf install docker-compose-plugin
). - Save the YAML content as
docker-compose.yml
in your project root. - Run commands from the same directory:
docker-compose build
: Build the images defined in the file.docker-compose up -d
: Start the services in detached mode.docker-compose down
: Stop and remove the containers.docker-compose logs -f api
: View logs for theapi
service.
Reverse Proxy (Nginx)
While Gunicorn/Uvicorn serve your application, it's best practice to put a reverse proxy like Nginx in front of them.
Benefits of Nginx:
- Load Balancing: Distribute incoming traffic across multiple instances/workers of your FastAPI application.
- HTTPS/SSL Termination: Handle HTTPS encryption/decryption, freeing your application server from this task.
- Serving Static Files: Efficiently serve static assets (CSS, JS, images) directly, reducing load on your API server.
- Caching: Cache responses to improve performance.
- Security: Can provide rate limiting, IP blocking, and handle certain types of attacks.
- Compression: Compress responses (e.g., using gzip).
Basic Nginx Configuration (nginx.conf
):
# Basic Nginx configuration to proxy requests to FastAPI app
# /etc/nginx/sites-available/your_app or similar location
server {
listen 80; # Listen on port 80 for HTTP traffic
# For HTTPS (recommended), listen on 443 ssl; configure ssl_certificate and ssl_certificate_key
# listen 443 ssl http2;
# ssl_certificate /path/to/your/fullchain.pem;
# ssl_certificate_key /path/to/your/privkey.pem;
# include /path/to/options-ssl-nginx.conf; # SSL hardening options
# ssl_dhparam /path/to/ssl-dhparams.pem;
server_name your_domain.com www.your_domain.com localhost 127.0.0.1; # Your server's domain or IP
# Optional: Add access and error logs
access_log /var/log/nginx/your_app.access.log;
error_log /var/log/nginx/your_app.error.log;
location / {
# Proxy requests to the FastAPI application server
# Assuming the FastAPI app (Gunicorn/Uvicorn) is running on port 8000 on the same machine
# If using Docker Compose network, use service name: proxy_pass http://api:8000;
proxy_pass http://127.0.0.1:8000;
# Set headers to pass necessary information to the backend application
proxy_set_header Host $host; # Pass the original host header
proxy_set_header X-Real-IP $remote_addr; # Pass the real client IP
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # Pass list of IPs if behind multiple proxies
proxy_set_header X-Forwarded-Proto $scheme; # Pass the original protocol (http or https)
# Optional: Increase timeouts for long-running requests if needed
# proxy_connect_timeout 60s;
# proxy_send_timeout 60s;
# proxy_read_timeout 60s;
}
# Optional: Location block for serving static files directly (if any)
# location /static {
# alias /path/to/your/static/files;
# expires 1d; # Cache static files for 1 day
# access_log off; # Don't log access to static files
# }
# Optional: Handle specific errors
# error_page 500 502 503 504 /50x.html;
# location = /50x.html {
# root /usr/share/nginx/html;
# }
}
Setting up Nginx:
- Install Nginx:
sudo apt install nginx
orsudo dnf install nginx
. - Create a configuration file for your site (e.g.,
/etc/nginx/sites-available/task-manager-api
). - Paste and customize the configuration above.
- Enable the site by creating a symbolic link:
sudo ln -s /etc/nginx/sites-available/task-manager-api /etc/nginx/sites-enabled/
- Test the Nginx configuration:
sudo nginx -t
- Reload Nginx to apply changes:
sudo systemctl reload nginx
orsudo service nginx reload
.
Now, requests to your server on port 80 (or 443 for HTTPS) will be forwarded by Nginx to your FastAPI application running on port 8000. Your application only needs to listen on 127.0.0.1:8000
if Nginx is on the same machine, or 0.0.0.0:8000
if running in a container accessed by Nginx.
Process Management (Systemd)
If you deploy directly on a VM (not using Docker), you need a way to ensure your Gunicorn/Uvicorn process starts automatically on boot and restarts if it crashes. systemd
is the standard init system and service manager on most modern Linux distributions.
Example systemd
Service File (/etc/systemd/system/fastapi-app.service
):
[Unit]
Description=FastAPI Task Manager Application (Gunicorn/Uvicorn)
# Start after network is available
After=network.target
[Service]
# User and group to run the service as (create a dedicated user if needed)
User=your_app_user
Group=your_app_group
# Working directory where your application code and venv reside
WorkingDirectory=/path/to/your/task_manager_api
# Environment variables (e.g., secrets, DB URL) - consider .env file or other secret management
# Environment="SECRET_KEY=your_production_secret"
# Environment="DATABASE_URL=..."
# Command to start the service
# Adjust path to gunicorn/python in your virtualenv
ExecStart=/path/to/your/task_manager_api/venv/bin/gunicorn \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
app.main:app
# Restart the service if it fails
Restart=on-failure
RestartSec=5s
# Optional: Logging configuration (default is journald)
# StandardOutput=syslog
# StandardError=syslog
# SyslogIdentifier=fastapi-app
[Install]
# Enable the service to start on boot
WantedBy=multi-user.target
Using the systemd
Service:
- Create the
.service
file in/etc/systemd/system/
. - Replace placeholders (
your_app_user
, paths, command) with your actual values. - Reload
systemd
to recognize the new service:sudo systemctl daemon-reload
. - Enable the service to start on boot:
sudo systemctl enable fastapi-app.service
. - Start the service immediately:
sudo systemctl start fastapi-app.service
. - Check the status:
sudo systemctl status fastapi-app.service
. - View logs:
sudo journalctl -u fastapi-app.service -f
(follow logs). - Stop/Restart:
sudo systemctl stop fastapi-app.service
,sudo systemctl restart fastapi-app.service
.
Workshop Deploying the Task Management API using Docker and Nginx
Project: Package the Task Management API into a Docker image and configure Nginx to serve it.
Goal: Create a Dockerfile
, build an image, run the container, and set up a basic Nginx reverse proxy configuration on your Linux machine to access the API through Nginx.
Steps:
-
Ensure Setup: Be in the
task_manager_api
root directory. Have the working API code, virtual environment, and Docker installed on your Linux system. -
Create
requirements.txt
:source venv/bin/activate pip freeze > requirements.txt # Optional: Clean up requirements.txt, keeping only necessary packages: # fastapi, uvicorn, gunicorn, sqlalchemy, psycopg2-binary (if using postgres), # python-jose[cryptography], passlib[bcrypt], pydantic[email] (if using EmailStr) # Remove pytest, etc. deactivate
-
Create
Dockerfile
: Create a file namedDockerfile
in the project root directory. Copy the exampleDockerfile
content provided in the "Containerization with Docker" section above. Make sureCOPY ./app /app/app
and theCMD
line correctly point to your application structure (app.main:app
). -
Build the Docker Image:
Verify the image was built:docker images | grep task-manager-api
-
Run the Container (Test): Run the container directly first to ensure it works.
# Map host port 8000 to container port 8000 for direct access test docker run -d --name task-api-test -p 8000:8000 task-manager-api:local
- Wait a few seconds for Gunicorn to start.
- Test access directly via
http://127.0.0.1:8000/docs
in your browser or usingcurl http://127.0.0.1:8000/
. It should work. - Stop and remove this test container:
docker stop task-api-test && docker rm task-api-test
-
Install and Configure Nginx:
- Install Nginx if you haven't already:
sudo apt update && sudo apt install nginx
(Debian/Ubuntu) orsudo dnf install nginx
(Fedora/CentOS). - Start and enable Nginx:
sudo systemctl start nginx && sudo systemctl enable nginx
. - Verify Nginx is running:
sudo systemctl status nginx
. You should also be able to access the default Nginx page by visitinghttp://<your-linux-ip>
orhttp://localhost
. - Create an Nginx configuration file for your API:
-
Paste the following configuration, adjusting
server_name
if needed:Self-correction: Changedserver { listen 80; server_name localhost 127.0.0.1 your_server_ip; # Replace your_server_ip if needed access_log /var/log/nginx/task-api.access.log; error_log /var/log/nginx/task-api.error.log; location / { # Proxy requests to the Docker container's port 8000 # We will map host port 9000 to container 8000 to avoid conflict with potential Nginx on 8000 proxy_pass http://127.0.0.1:9000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } }
proxy_pass
target port to 9000. We will map the host port 9000 to the container's internal port 8000 in the next step. This avoids potential conflicts if something else (even Nginx itself in some configs) tries to use port 8000 on the host. -
Save the file (
Ctrl+X
, thenY
, thenEnter
innano
). - Enable this site configuration:
- Test the Nginx configuration: It should report syntax is okay.
- Reload Nginx to apply the new configuration:
- Install Nginx if you haven't already:
-
Run the Docker Container (Behind Nginx): Now run the container again, mapping host port 9000 (which Nginx targets) to the container's internal port 8000.
-
Test Access Through Nginx:
- Open your browser and navigate to
http://localhost/docs
orhttp://<your-linux-ip>/docs
(using port 80, the default HTTP port Nginx is listening on). - You should see the FastAPI Swagger UI, now served via Nginx.
- Test the API endpoints (register, login, tasks) through this Nginx address. Everything should work as before.
- Check the Nginx logs (
/var/log/nginx/task-api.access.log
) and the container logs (docker logs task-api-prod
) to see requests being processed.
- Open your browser and navigate to
You have successfully containerized your FastAPI application using Docker and deployed it behind an Nginx reverse proxy on your Linux system. This setup provides a robust, scalable, and secure foundation for serving your API in production. For a real production environment, you would also configure HTTPS in Nginx, manage secrets securely (e.g., via environment variables or a secrets manager), and potentially use Docker Compose or Kubernetes for orchestration.
Conclusion
Throughout this guide, we've journeyed from the fundamental concepts of APIs and REST to building, structuring, securing, testing, and deploying a sophisticated FastAPI application on Linux. You started with basic path operations and Pydantic models, progressively adding data validation, error handling, structured routing with APIRouter
, asynchronous operations with async/await
and httpx
, robust JWT-based authentication and authorization, and persistent data storage using SQLAlchemy and a SQL database. Finally, you learned how to package your application using Docker and deploy it behind an Nginx reverse proxy, creating a production-ready setup.
FastAPI's modern design, leveraging Python type hints, asynchronous capabilities via ASGI, and integration with Pydantic and Starlette, makes it an exceptionally powerful and developer-friendly framework. Its high performance, automatic documentation, built-in data validation, and intuitive dependency injection system significantly accelerate development while reducing bugs.
The Task Management API project served as a practical vehicle to apply these concepts, evolving from a simple in-memory list to a database-backed, authenticated, and tested application ready for deployment.
Key Takeaways:
- FastAPI enables rapid development of high-performance Python APIs.
- Pydantic is central to FastAPI for data validation and serialization.
async def
andawait
are crucial for efficient I/O-bound operations.APIRouter
and Dependency Injection (Depends
) are key for structuring larger applications.- OAuth2 with JWT is a standard and secure method for API authentication.
- SQLAlchemy provides powerful ORM capabilities for database interaction.
- Testing with
pytest
andTestClient
is essential for reliability. - Docker and Nginx offer a standard and robust deployment pattern on Linux.
Further Learning:
- FastAPI Official Documentation: The documentation (https://fastapi.tiangolo.com/) is excellent, comprehensive, and filled with examples. It's your primary resource.
- SQLAlchemy Documentation: (https://www.sqlalchemy.org/) Dive deeper into Core and ORM features, especially asynchronous usage (SQLAlchemy 2.0).
- Pydantic Documentation: (https://pydantic-docs.helpmanual.io/) Explore advanced validation techniques.
- Docker Documentation: (https://docs.docker.com/) Learn more about containerization best practices.
- Nginx Documentation: (https://nginx.org/en/docs/) Understand advanced configuration options.
- Alembic: For database schema migrations (https://alembic.sqlalchemy.org/). Essential for managing database changes in production applications instead of
Base.metadata.create_all
. - Celery or ARQ: For background task processing if your API needs to trigger long-running jobs.
- WebSockets: FastAPI has excellent support for real-time communication using WebSockets.
Building APIs is a continuous learning process. By mastering the concepts covered here and exploring further resources, you are well-equipped to develop professional, high-quality APIs using FastAPI on Linux.